This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b740fddff [INLONG-4958][Agent] Unify the exposed metrics for the Agent
(#5301)
b740fddff is described below
commit b740fddff8bd2fb52079e244881bd4653b17c6e0
Author: Greedyu <[email protected]>
AuthorDate: Tue Aug 2 20:37:30 2022 +0800
[INLONG-4958][Agent] Unify the exposed metrics for the Agent (#5301)
---
.../inlong/agent/constant/AgentConstants.java | 10 ++-
.../agent/metrics/AgentJmxMetricListener.java} | 45 +++++-----
.../agent/metrics/AgentMetricBaseListener.java} | 48 ++++++-----
.../inlong/agent/metrics/AgentMetricSingleton.java | 67 +++++++++++++++
.../metrics/AgentPrometheusMetricListener.java | 71 ++++++++++++++++
.../agent/metrics/global}/GlobalMetrics.java | 98 +++++++++-------------
.../agent/metrics/global/JmxGlobalMetrics.java | 43 ++++++++++
.../metrics/global/PrometheusGlobalMetrics.java | 43 ++++++++++
.../inlong/agent/metrics}/job/JobJmxMetrics.java | 8 +-
.../inlong/agent/metrics}/job/JobMetrics.java | 2 +-
.../agent/metrics}/job/JobPrometheusMetrics.java | 2 +-
.../agent/metrics/plugin}/PluginJmxMetric.java | 2 +-
.../inlong/agent/metrics/plugin}/PluginMetric.java | 2 +-
.../metrics/plugin}/PluginPrometheusMetric.java | 2 +-
.../inlong/agent/metrics/sink}/SinkJmxMetric.java | 2 +-
.../inlong/agent/metrics/sink}/SinkMetric.java | 2 +-
.../agent/metrics/sink}/SinkPrometheusMetric.java | 2 +-
.../agent/metrics/source}/SourceJmxMetric.java | 2 +-
.../inlong/agent/metrics/source}/SourceMetric.java | 2 +-
.../metrics/source}/SourcePrometheusMetric.java | 2 +-
.../inlong/agent/metrics}/task/TaskJmxMetrics.java | 8 +-
.../inlong/agent/metrics}/task/TaskMetrics.java | 2 +-
.../agent/metrics}/task/TaskPrometheusMetrics.java | 2 +-
.../org/apache/inlong/agent/utils/ConfigUtil.java | 39 ---------
.../org/apache/inlong/agent/core/AgentMain.java | 30 ++-----
.../apache/inlong/agent/core/job/JobManager.java | 9 +-
.../apache/inlong/agent/core/task/TaskManager.java | 9 +-
.../inlong/agent/core/TestTaskJmxMetrics.java | 5 +-
inlong-agent/agent-docker/agent-docker.sh | 4 +-
.../inlong/agent/plugin/channel/MemoryChannel.java | 22 ++---
.../inlong/agent/plugin/sinks/ConsoleSink.java | 7 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 6 +-
.../inlong/agent/plugin/sinks/SenderManager.java | 4 +-
.../inlong/agent/plugin/sources/BinlogSource.java | 4 +-
.../agent/plugin/sources/DatabaseSqlSource.java | 6 +-
.../inlong/agent/plugin/sources/KafkaSource.java | 6 +-
.../agent/plugin/sources/TextFileSource.java | 4 +-
.../agent/plugin/sources/reader/BinlogReader.java | 4 +-
.../agent/plugin/sources/reader/KafkaReader.java | 4 +-
.../agent/plugin/sources/reader/SqlReader.java | 6 +-
.../plugin/sources/reader/TextFileReader.java | 6 +-
.../org/apache/inlong/agent/plugin/MiniAgent.java | 2 -
.../agent/plugin/metrics/GlobalMetricsTest.java | 35 ++++----
.../apache/inlong/agent/plugin/sinks/MockSink.java | 6 +-
.../agent/plugin/sources/TestTextFileReader.java | 7 --
inlong-agent/bin/agent-env.sh | 2 +-
inlong-agent/conf/agent.properties | 22 ++---
47 files changed, 427 insertions(+), 289 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 6c90f7f11..ae7a564d8 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.constant;
+import org.apache.inlong.agent.metrics.AgentMetricSingleton;
+import org.apache.inlong.agent.metrics.global.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
/**
@@ -173,9 +175,6 @@ public class AgentConstants {
public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
- public static final String PROMETHEUS_ENABLE = "agent.prometheus.enable";
- public static final boolean DEFAULT_PROMETHEUS_ENABLE = false;
-
public static final String PROMETHEUS_EXPORTER_PORT =
"agent.prometheus.exporter.port";
public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
@@ -194,4 +193,9 @@ public class AgentConstants {
public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit";
public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
+ public static final String AGENT_METRIC_LISTENER_CLASS =
"agent.domainListeners";
+ public static final String AGENT_METRIC_LISTENER_CLASS_DEFAULT =
+ "org.apache.inlong.agent.metrics.AgentPrometheusMetricListener";
+
+ public static final GlobalMetrics GLOBAL_METRICS =
AgentMetricSingleton.getAgentMetricHandler().globalMetrics;
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentJmxMetricListener.java
similarity index 54%
copy from
inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
copy to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentJmxMetricListener.java
index c98da4e09..71a8aedcf 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentJmxMetricListener.java
@@ -15,27 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.core;
-
-import org.apache.inlong.agent.core.task.TaskJmxMetrics;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestTaskJmxMetrics {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(AgentBaseTestsHelper.class);
-
- @Test
- public void testAgentMetrics() {
- try {
- TaskJmxMetrics taskJmxMetrics = TaskJmxMetrics.create();
- taskJmxMetrics.incRetryingTaskCount();
- Assert.assertEquals(taskJmxMetrics.module, "AgentTaskMetric");
- } catch (Exception ex) {
- LOGGER.error("error happens" + ex);
- }
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.agent.metrics.global.JmxGlobalMetrics;
+import org.apache.inlong.agent.metrics.job.JobJmxMetrics;
+import org.apache.inlong.agent.metrics.task.TaskJmxMetrics;
+
+/**
+ * jmx metric handler
+ */
+public class AgentJmxMetricListener extends AgentMetricBaseListener {
+
+ public AgentJmxMetricListener() {
+ jobMetrics = JobJmxMetrics.create();
+ taskMetrics = TaskJmxMetrics.create();
+ globalMetrics = new JmxGlobalMetrics();
}
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricBaseListener.java
similarity index 51%
copy from
inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
copy to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricBaseListener.java
index c98da4e09..a9b216c2c 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricBaseListener.java
@@ -15,27 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.core;
-
-import org.apache.inlong.agent.core.task.TaskJmxMetrics;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestTaskJmxMetrics {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(AgentBaseTestsHelper.class);
-
- @Test
- public void testAgentMetrics() {
- try {
- TaskJmxMetrics taskJmxMetrics = TaskJmxMetrics.create();
- taskJmxMetrics.incRetryingTaskCount();
- Assert.assertEquals(taskJmxMetrics.module, "AgentTaskMetric");
- } catch (Exception ex) {
- LOGGER.error("error happens" + ex);
- }
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.agent.metrics.global.GlobalMetrics;
+import org.apache.inlong.agent.metrics.job.JobMetrics;
+import org.apache.inlong.agent.metrics.task.TaskMetrics;
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+
+import java.util.List;
+
+/**
+ * Agent metric base handler
+ */
+public abstract class AgentMetricBaseListener implements MetricListener {
+
+ public JobMetrics jobMetrics;
+
+ public TaskMetrics taskMetrics;
+
+ public GlobalMetrics globalMetrics;
+
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ // nothing
}
+ public abstract void init();
+
+ public abstract void close();
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricSingleton.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricSingleton.java
new file mode 100644
index 000000000..dbe309e6c
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricSingleton.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_METRIC_LISTENER_CLASS;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_METRIC_LISTENER_CLASS_DEFAULT;
+
+/**
+ * metric singleton
+ */
+public class AgentMetricSingleton {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentMetricSingleton.class);
+ private static volatile AgentMetricBaseListener agentMetricBaseHandler;
+
+ private AgentMetricSingleton() {
+ }
+
+ public static AgentMetricBaseListener getAgentMetricHandler() {
+ if (agentMetricBaseHandler == null) {
+ synchronized (AgentJmxMetricListener.class) {
+ if (agentMetricBaseHandler == null) {
+ agentMetricBaseHandler = getAgentMetricByConf();
+ agentMetricBaseHandler.init();
+ if (agentMetricBaseHandler != null) {
+ LOGGER.info("metric class {} was initialized
successfully",
+
agentMetricBaseHandler.getClass().getSimpleName());
+ }
+ }
+ }
+ }
+ return agentMetricBaseHandler;
+ }
+
+ private static AgentMetricBaseListener getAgentMetricByConf() {
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ try {
+ Class<?> handlerClass = ClassUtils
+ .getClass(conf.get(AGENT_METRIC_LISTENER_CLASS,
AGENT_METRIC_LISTENER_CLASS_DEFAULT));
+ Object handlerObject =
handlerClass.getDeclaredConstructor().newInstance();
+ return (AgentMetricBaseListener) handlerObject;
+ } catch (Exception ex) {
+ LOGGER.error("cannot find AgentMetricBaseHandler, {}",
ex.getMessage());
+ }
+ return null;
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
new file mode 100644
index 000000000..35083968f
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.global.PrometheusGlobalMetrics;
+import org.apache.inlong.agent.metrics.job.JobPrometheusMetrics;
+import org.apache.inlong.agent.metrics.task.TaskPrometheusMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT;
+import static
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT;
+
+/**
+ * prometheus metric handler
+ */
+public class AgentPrometheusMetricListener extends AgentMetricBaseListener {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
+ private static HTTPServer metricsServer;
+
+ static {
+ DefaultExports.initialize();
+ }
+
+ public AgentPrometheusMetricListener() {
+ jobMetrics = new JobPrometheusMetrics();
+ taskMetrics = new TaskPrometheusMetrics();
+ globalMetrics = new PrometheusGlobalMetrics();
+ }
+
+ @Override
+ public void init() {
+ // starting metrics server
+ int metricsServerPort = AgentConfiguration.getAgentConf()
+ .getInt(PROMETHEUS_EXPORTER_PORT,
DEFAULT_PROMETHEUS_EXPORTER_PORT);
+ LOGGER.info("Starting prometheus metrics server on port {}",
metricsServerPort);
+ try {
+ metricsServer = new HTTPServer(metricsServerPort);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (metricsServer != null) {
+ metricsServer.close();
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/GlobalMetrics.java
similarity index 52%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/GlobalMetrics.java
index 9bd691e85..db53b69fb 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/GlobalMetrics.java
@@ -15,152 +15,130 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.global;
-import org.apache.inlong.agent.utils.ConfigUtil;
+import org.apache.inlong.agent.metrics.plugin.PluginMetric;
+import org.apache.inlong.agent.metrics.sink.SinkMetric;
+import org.apache.inlong.agent.metrics.source.SourceMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-public class GlobalMetrics {
+/**
+ * Global Metrics
+ */
+public abstract class GlobalMetrics {
private static final Logger LOGGER =
LoggerFactory.getLogger(GlobalMetrics.class);
// key: groupId_streamId
- private static final ConcurrentHashMap<String, PluginMetric> pluginMetrics
= new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, PluginMetric> pluginMetrics = new
ConcurrentHashMap<>();
// key: sourceType_groupId_streamId
- private static final ConcurrentHashMap<String, SourceMetric> sourceMetrics
= new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, SourceMetric> sourceMetrics = new
ConcurrentHashMap<>();
// key: sinkType_groupId_streamId
- private static final ConcurrentHashMap<String, SinkMetric> sinkMetrics =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, SinkMetric> sinkMetrics = new
ConcurrentHashMap<>();
- private static PluginMetric getPluginMetric(String tagName) {
+ private PluginMetric getPluginMetric(String tagName) {
return pluginMetrics.computeIfAbsent(tagName, (key) ->
addPluginMetric(tagName));
}
- private static PluginMetric addPluginMetric(String tagName) {
- PluginMetric metric;
- if (ConfigUtil.isPrometheusEnabled()) {
- metric = new PluginPrometheusMetric(tagName);
- } else {
- metric = new PluginJmxMetric(tagName);
- }
- LOGGER.info("add {} pluginMetrics", tagName);
- return metric;
- }
+ protected abstract PluginMetric addPluginMetric(String tagName);
- private static SourceMetric getSourceMetric(String tagName) {
+ private SourceMetric getSourceMetric(String tagName) {
return sourceMetrics.computeIfAbsent(tagName, (key) ->
addSourceMetric(tagName));
}
- private static SourceMetric addSourceMetric(String tagName) {
- SourceMetric metric;
- if (ConfigUtil.isPrometheusEnabled()) {
- metric = new SourcePrometheusMetric(tagName);
- } else {
- metric = new SourceJmxMetric(tagName);
- }
- LOGGER.info("add {} sourceMetric", tagName);
- return metric;
- }
+ protected abstract SourceMetric addSourceMetric(String tagName);
- private static SinkMetric getSinkMetric(String tagName) {
+ private SinkMetric getSinkMetric(String tagName) {
return sinkMetrics.computeIfAbsent(tagName, (key) ->
addSinkMetric(tagName));
}
- private static SinkMetric addSinkMetric(String tagName) {
- SinkMetric metric;
- if (ConfigUtil.isPrometheusEnabled()) {
- metric = new SinkPrometheusMetric(tagName);
- } else {
- metric = new SinkJmxMetric(tagName);
- }
- LOGGER.info("add {} sinkMetric", tagName);
- return metric;
- }
+ protected abstract SinkMetric addSinkMetric(String tagName);
- public static void incReadNum(String tagName) {
+ public void incReadNum(String tagName) {
getPluginMetric(tagName).incReadNum();
}
- public static long getReadNum(String tagName) {
+ public long getReadNum(String tagName) {
return getPluginMetric(tagName).getReadNum();
}
- public static void incSendNum(String tagName) {
+ public void incSendNum(String tagName) {
getPluginMetric(tagName).incSendNum();
}
- public static long getSendNum(String tagName) {
+ public long getSendNum(String tagName) {
return getPluginMetric(tagName).getReadNum();
}
- public static void incReadFailedNum(String tagName) {
+ public void incReadFailedNum(String tagName) {
getPluginMetric(tagName).incReadFailedNum();
}
- public static long getReadFailedNum(String tagName) {
+ public long getReadFailedNum(String tagName) {
return getPluginMetric(tagName).getReadFailedNum();
}
- public static void incSendFailedNum(String tagName) {
+ public void incSendFailedNum(String tagName) {
getPluginMetric(tagName).incSendFailedNum();
}
- public static long getSendFailedNum(String tagName) {
+ public long getSendFailedNum(String tagName) {
return getPluginMetric(tagName).getSendFailedNum();
}
- public static void incReadSuccessNum(String tagName) {
+ public void incReadSuccessNum(String tagName) {
getPluginMetric(tagName).incReadSuccessNum();
}
- public static long getReadSuccessNum(String tagName) {
+ public long getReadSuccessNum(String tagName) {
return getPluginMetric(tagName).getReadSuccessNum();
}
- public static void incSendSuccessNum(String tagName) {
+ public void incSendSuccessNum(String tagName) {
getPluginMetric(tagName).incSendSuccessNum();
}
- public static void incSendSuccessNum(String tagName, int delta) {
+ public void incSendSuccessNum(String tagName, int delta) {
getPluginMetric(tagName).incSendSuccessNum(delta);
}
- public static long getSendSuccessNum(String tagName) {
+ public long getSendSuccessNum(String tagName) {
return getPluginMetric(tagName).getSendSuccessNum();
}
- public static void incSinkSuccessCount(String tagName) {
+ public void incSinkSuccessCount(String tagName) {
getSinkMetric(tagName).incSinkSuccessCount();
}
- public static long getSinkSuccessCount(String tagName) {
+ public long getSinkSuccessCount(String tagName) {
return getSinkMetric(tagName).getSinkSuccessCount();
}
- public static void incSinkFailCount(String tagName) {
+ public void incSinkFailCount(String tagName) {
getSinkMetric(tagName).incSinkFailCount();
}
- public static long getSinkFailCount(String tagName) {
+ public long getSinkFailCount(String tagName) {
return getSinkMetric(tagName).getSinkFailCount();
}
- public static void incSourceSuccessCount(String tagName) {
+ public void incSourceSuccessCount(String tagName) {
getSourceMetric(tagName).incSourceSuccessCount();
}
- public static long getSourceSuccessCount(String tagName) {
+ public long getSourceSuccessCount(String tagName) {
return getSourceMetric(tagName).getSourceSuccessCount();
}
- public static void incSourceFailCount(String tagName) {
+ public void incSourceFailCount(String tagName) {
getSourceMetric(tagName).incSourceFailCount();
}
- public static void showMemoryChannelStatics() {
+ public void showMemoryChannelStatics() {
for (Entry<String, PluginMetric> entry : pluginMetrics.entrySet()) {
LOGGER.info("tagName:{} ### readNum: {}, readSuccessNum: {},
readFailedNum: {}, sendSuccessNum: {}, "
+ "sendFailedNum: {}", entry.getKey(),
entry.getValue().getReadNum(),
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/JmxGlobalMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/JmxGlobalMetrics.java
new file mode 100644
index 000000000..774c98df5
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/JmxGlobalMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics.global;
+
+import org.apache.inlong.agent.metrics.plugin.PluginJmxMetric;
+import org.apache.inlong.agent.metrics.plugin.PluginMetric;
+import org.apache.inlong.agent.metrics.sink.SinkJmxMetric;
+import org.apache.inlong.agent.metrics.sink.SinkMetric;
+import org.apache.inlong.agent.metrics.source.SourceJmxMetric;
+import org.apache.inlong.agent.metrics.source.SourceMetric;
+
+public class JmxGlobalMetrics extends GlobalMetrics {
+
+ @Override
+ protected PluginMetric addPluginMetric(String tagName) {
+ return new PluginJmxMetric(tagName);
+ }
+
+ @Override
+ protected SourceMetric addSourceMetric(String tagName) {
+ return new SourceJmxMetric(tagName);
+ }
+
+ @Override
+ protected SinkMetric addSinkMetric(String tagName) {
+ return new SinkJmxMetric(tagName);
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/PrometheusGlobalMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/PrometheusGlobalMetrics.java
new file mode 100644
index 000000000..48a525566
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/global/PrometheusGlobalMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics.global;
+
+import org.apache.inlong.agent.metrics.plugin.PluginMetric;
+import org.apache.inlong.agent.metrics.plugin.PluginPrometheusMetric;
+import org.apache.inlong.agent.metrics.sink.SinkMetric;
+import org.apache.inlong.agent.metrics.sink.SinkPrometheusMetric;
+import org.apache.inlong.agent.metrics.source.SourceMetric;
+import org.apache.inlong.agent.metrics.source.SourcePrometheusMetric;
+
+public class PrometheusGlobalMetrics extends GlobalMetrics {
+
+ @Override
+ protected PluginMetric addPluginMetric(String tagName) {
+ return new PluginPrometheusMetric(tagName);
+ }
+
+ @Override
+ protected SourceMetric addSourceMetric(String tagName) {
+ return new SourcePrometheusMetric(tagName);
+ }
+
+ @Override
+ protected SinkMetric addSinkMetric(String tagName) {
+ return new SinkPrometheusMetric(tagName);
+ }
+}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobJmxMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobJmxMetrics.java
similarity index 98%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobJmxMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobJmxMetrics.java
index 8c8da04c3..9feb6ab33 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobJmxMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobJmxMetrics.java
@@ -17,10 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.job;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+package org.apache.inlong.agent.metrics.job;
import org.apache.inlong.common.metric.CountMetric;
import org.apache.inlong.common.metric.Dimension;
@@ -29,6 +26,9 @@ import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItem;
import org.apache.inlong.common.metric.MetricRegister;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
@MetricDomain(name = "AgentJob")
public class JobJmxMetrics extends MetricItem implements JobMetrics {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobMetrics.java
similarity index 96%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobMetrics.java
index 9fce70648..7996bdedd 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.job;
+package org.apache.inlong.agent.metrics.job;
public interface JobMetrics {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobPrometheusMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobPrometheusMetrics.java
similarity index 97%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobPrometheusMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobPrometheusMetrics.java
index d2a223943..aaa99b753 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobPrometheusMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/job/JobPrometheusMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.job;
+package org.apache.inlong.agent.metrics.job;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginJmxMetric.java
similarity index 98%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginJmxMetric.java
index 93c7e8e85..9c18179c7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginJmxMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.plugin;
import org.apache.inlong.agent.metrics.Metric;
import org.apache.inlong.common.metric.Dimension;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginMetric.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginMetric.java
index 496dc68b6..2aee2755f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginMetric.java
@@ -15,7 +15,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.plugin;
public interface PluginMetric {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginPrometheusMetric.java
similarity index 99%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginPrometheusMetric.java
index 088f94ddf..9974e57f2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/plugin/PluginPrometheusMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.plugin;
import io.prometheus.client.Counter;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkJmxMetric.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkJmxMetric.java
index b4cf365e0..d57b2a499 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkJmxMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.sink;
import org.apache.inlong.agent.metrics.Metric;
import org.apache.inlong.common.metric.Dimension;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkMetric.java
similarity index 96%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkMetric.java
index a9e6d92cb..ee9a8547e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.sink;
public interface SinkMetric {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkPrometheusMetric.java
similarity index 98%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkPrometheusMetric.java
index 4e6a62bd3..27ed531c3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/sink/SinkPrometheusMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.sink;
import io.prometheus.client.Counter;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceJmxMetric.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceJmxMetric.java
index 2f551f877..cb3bcdad7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceJmxMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.source;
import org.apache.inlong.agent.metrics.Metric;
import org.apache.inlong.common.metric.Dimension;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceMetric.java
similarity index 96%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceMetric.java
index 47b24721f..b6be83ce0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourceMetric.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.source;
public interface SourceMetric {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourcePrometheusMetric.java
similarity index 98%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourcePrometheusMetric.java
index 260e7c48b..c7db40c86 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/source/SourcePrometheusMetric.java
@@ -15,7 +15,7 @@
* under the License.
*/
-package org.apache.inlong.agent.plugin.metrics;
+package org.apache.inlong.agent.metrics.source;
import io.prometheus.client.Counter;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskJmxMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskJmxMetrics.java
similarity index 98%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskJmxMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskJmxMetrics.java
index 833008d78..ba9d13448 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskJmxMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskJmxMetrics.java
@@ -17,10 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.task;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+package org.apache.inlong.agent.metrics.task;
import org.apache.inlong.common.metric.CountMetric;
import org.apache.inlong.common.metric.Dimension;
@@ -29,6 +26,9 @@ import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItem;
import org.apache.inlong.common.metric.MetricRegister;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* metrics for agent task
*/
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskMetrics.java
similarity index 96%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskMetrics.java
index 13006d162..930d6b60e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.task;
+package org.apache.inlong.agent.metrics.task;
public interface TaskMetrics {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPrometheusMetrics.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskPrometheusMetrics.java
similarity index 98%
rename from
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPrometheusMetrics.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskPrometheusMetrics.java
index 7914aeeef..ee4e5d857 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPrometheusMetrics.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/task/TaskPrometheusMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.agent.core.task;
+package org.apache.inlong.agent.metrics.task;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
deleted file mode 100644
index fd6fad2b4..000000000
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.agent.utils;
-
-import org.apache.inlong.agent.conf.AgentConfiguration;
-
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_ENABLE;
-import static
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_ENABLE;
-
-/**
- * ConfigUtil
- */
-public class ConfigUtil {
-
- /**
- * whether use prometheus metrics
- */
- public static boolean isPrometheusEnabled() {
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- return conf.getBoolean(PROMETHEUS_ENABLE, DEFAULT_PROMETHEUS_ENABLE);
- }
-}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
index 0fb9e6cab..f1a716884 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
@@ -17,8 +17,6 @@
package org.apache.inlong.agent.core;
-import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.DefaultExports;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -26,16 +24,14 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.AgentMetricBaseListener;
+import org.apache.inlong.agent.metrics.AgentMetricSingleton;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT;
-import static
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT;
-
/**
* Agent entrance class
*/
@@ -43,14 +39,7 @@ public class AgentMain {
private static final Logger LOGGER =
LoggerFactory.getLogger(AgentMain.class);
- private static HTTPServer metricsServer;
-
- static {
- if (ConfigUtil.isPrometheusEnabled()) {
- // register hotspot collectors
- DefaultExports.initialize();
- }
- }
+ private static AgentMetricBaseListener agentJmxMetricHandler;
/**
* Print help information
@@ -129,14 +118,7 @@ public class AgentMain {
try {
manager.start();
stopManagerIfKilled(manager);
-
- if (ConfigUtil.isPrometheusEnabled()) {
- // starting metrics server
- int metricsServerPort = AgentConfiguration.getAgentConf()
- .getInt(PROMETHEUS_EXPORTER_PORT,
DEFAULT_PROMETHEUS_EXPORTER_PORT);
- LOGGER.info("Starting prometheus metrics server on port {}",
metricsServerPort);
- metricsServer = new HTTPServer(metricsServerPort);
- }
+ agentJmxMetricHandler =
AgentMetricSingleton.getAgentMetricHandler();
manager.join();
} catch (Exception ex) {
@@ -144,9 +126,7 @@ public class AgentMain {
} finally {
manager.stop();
AuditUtils.sendReport();
- if (metricsServer != null) {
- metricsServer.stop();
- }
+ agentJmxMetricHandler.close();
}
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 1c3a3d6f9..908048f6a 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -25,8 +25,9 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.JobProfileDb;
import org.apache.inlong.agent.db.StateSearchKey;
+import org.apache.inlong.agent.metrics.AgentMetricSingleton;
+import org.apache.inlong.agent.metrics.job.JobMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,11 +97,7 @@ public class JobManager extends AbstractDaemon {
this.jobDbCacheCheckInterval =
conf.getLong(JOB_DB_CACHE_CHECK_INTERVAL, DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
this.jobMaxSize = conf.getLong(JOB_NUMBER_LIMIT,
DEFAULT_JOB_NUMBER_LIMIT);
- if (ConfigUtil.isPrometheusEnabled()) {
- this.jobMetrics = new JobPrometheusMetrics();
- } else {
- this.jobMetrics = JobJmxMetrics.create();
- }
+ this.jobMetrics =
AgentMetricSingleton.getAgentMetricHandler().jobMetrics;
}
/**
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 918bb4483..376e4bd7f 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -22,8 +22,9 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
+import org.apache.inlong.agent.metrics.AgentMetricSingleton;
+import org.apache.inlong.agent.metrics.task.TaskMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,11 +68,7 @@ public class TaskManager extends AbstractDaemon {
new SynchronousQueue<>(),
new AgentThreadFactory("task"));
// metric for task level
- if (ConfigUtil.isPrometheusEnabled()) {
- this.taskMetrics = new TaskPrometheusMetrics();
- } else {
- this.taskMetrics = TaskJmxMetrics.create();
- }
+ this.taskMetrics =
AgentMetricSingleton.getAgentMetricHandler().taskMetrics;
tasks = new ConcurrentHashMap<>();
AgentConfiguration conf = AgentConfiguration.getAgentConf();
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
index c98da4e09..285d0a3c8 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/TestTaskJmxMetrics.java
@@ -17,7 +17,8 @@
package org.apache.inlong.agent.core;
-import org.apache.inlong.agent.core.task.TaskJmxMetrics;
+import org.apache.inlong.agent.metrics.AgentJmxMetricListener;
+import org.apache.inlong.agent.metrics.task.TaskJmxMetrics;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -30,7 +31,7 @@ public class TestTaskJmxMetrics {
@Test
public void testAgentMetrics() {
try {
- TaskJmxMetrics taskJmxMetrics = TaskJmxMetrics.create();
+ TaskJmxMetrics taskJmxMetrics = (TaskJmxMetrics) new
AgentJmxMetricListener().taskMetrics;
taskJmxMetrics.incRetryingTaskCount();
Assert.assertEquals(taskJmxMetrics.module, "AgentTaskMetric");
} catch (Exception ex) {
diff --git a/inlong-agent/agent-docker/agent-docker.sh
b/inlong-agent/agent-docker/agent-docker.sh
index ed2a37177..42f0cb7ad 100644
--- a/inlong-agent/agent-docker/agent-docker.sh
+++ b/inlong-agent/agent-docker/agent-docker.sh
@@ -30,8 +30,8 @@ agent.dataproxy.http.host=$DATAPROXY_IP
agent.dataproxy.http.port=$DATAPROXY_PORT
agent.http.port=8008
agent.http.enable=true
-agent.prometheus.enable=true
-agent.prometheus.exporter.port=8080
+agent.domainListeners=org.apache.inlong.agent.metrics.AgentPrometheusMetricListener
+agent.prometheus.exporter.port=9080
audit.proxys=$AUDIT_PROXY_URL
EOF
# start
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 583a3c5ae..7b57c5e0f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -22,7 +22,6 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.Channel;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
/**
@@ -54,12 +54,12 @@ public class MemoryChannel implements Channel {
if (message instanceof ProxyMessage) {
groupId = ((ProxyMessage) message).getInlongGroupId();
}
- GlobalMetrics.incReadNum(groupId);
+ GLOBAL_METRICS.incReadNum(groupId);
queue.put(message);
- GlobalMetrics.incReadSuccessNum(groupId);
+ GLOBAL_METRICS.incReadSuccessNum(groupId);
}
} catch (InterruptedException ex) {
- GlobalMetrics.incReadFailedNum(groupId);
+ GLOBAL_METRICS.incReadFailedNum(groupId);
Thread.currentThread().interrupt();
}
}
@@ -72,17 +72,17 @@ public class MemoryChannel implements Channel {
if (message instanceof ProxyMessage) {
groupId = ((ProxyMessage) message).getInlongGroupId();
}
- GlobalMetrics.incReadNum(groupId);
+ GLOBAL_METRICS.incReadNum(groupId);
boolean result = queue.offer(message, timeout, unit);
if (result) {
- GlobalMetrics.incReadSuccessNum(groupId);
+ GLOBAL_METRICS.incReadSuccessNum(groupId);
} else {
- GlobalMetrics.incReadFailedNum(groupId);
+ GLOBAL_METRICS.incReadFailedNum(groupId);
}
return result;
}
} catch (InterruptedException ex) {
- GlobalMetrics.incReadFailedNum(groupId);
+ GLOBAL_METRICS.incReadFailedNum(groupId);
Thread.currentThread().interrupt();
}
return false;
@@ -97,11 +97,11 @@ public class MemoryChannel implements Channel {
if (message instanceof ProxyMessage) {
groupId = ((ProxyMessage) message).getInlongGroupId();
}
- GlobalMetrics.incSendSuccessNum(groupId);
+ GLOBAL_METRICS.incSendSuccessNum(groupId);
}
return message;
} catch (InterruptedException ex) {
- GlobalMetrics.incSendFailedNum(groupId);
+ GLOBAL_METRICS.incSendFailedNum(groupId);
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
@@ -120,6 +120,6 @@ public class MemoryChannel implements Channel {
queue.clear();
}
LOGGER.info("destroy channel, show memory channel metric:");
- GlobalMetrics.showMemoryChannelStatics();
+ GLOBAL_METRICS.showMemoryChannelStatics();
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
index b6a14a1f3..5bddbcee9 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
@@ -20,10 +20,11 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import java.nio.charset.StandardCharsets;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
+
/**
* message write to console
*/
@@ -40,10 +41,10 @@ public class ConsoleSink extends AbstractSink {
if (message != null) {
System.out.println(new String(message.getBody(),
StandardCharsets.UTF_8));
// increment the count of successful sinks
- GlobalMetrics.incSinkSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSinkSuccessCount(metricTagName);
} else {
// increment the count of failed sinks
- GlobalMetrics.incSinkFailCount(metricTagName);
+ GLOBAL_METRICS.incSinkFailCount(metricTagName);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index b97116ad3..a6f851201 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -27,7 +27,6 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.message.PackProxyMessage;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -42,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
@@ -112,10 +112,10 @@ public class ProxySink extends AbstractSink {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis());
// increment the count of successful sinks
- GlobalMetrics.incSinkSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSinkSuccessCount(metricTagName);
} else {
// increment the count of failed sinks
- GlobalMetrics.incSinkFailCount(metricTagName);
+ GLOBAL_METRICS.incSinkFailCount(metricTagName);
}
}
} catch (Exception e) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 52c1ce948..4fe7c321b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -23,7 +23,6 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.message.SequentialID;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
@@ -41,6 +40,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -286,7 +286,7 @@ public class SenderManager {
return;
}
semaphore.release(bodyList.size());
- GlobalMetrics.incSendSuccessNum(groupId + "_" + streamId,
bodyList.size());
+ GLOBAL_METRICS.incSendSuccessNum(groupId + "_" + streamId,
bodyList.size());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath,
bodyList.size());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
index dfb0aba07..e1d2ed8c9 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
@@ -52,7 +52,7 @@ public class BinlogSource implements Source {
Reader binlogReader = new BinlogReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(binlogReader);
- GlobalMetrics.incSourceSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
return readerList;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index ee13ddb71..9a43e23da 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
import org.apache.inlong.agent.utils.AgentDbUtils;
import org.slf4j.Logger;
@@ -30,6 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
@@ -86,11 +86,11 @@ public class DatabaseSqlSource implements Source {
}
if (readerList != null) {
// increment the count of successful sources
- GlobalMetrics.incSourceSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
} else {
// database type or sql is incorrect
// increment the count of failed sources
- GlobalMetrics.incSourceFailCount(metricTagName);
+ GLOBAL_METRICS.incSourceFailCount(metricTagName);
}
return readerList;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index a817c4e12..6f91e1746 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -38,6 +37,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
@@ -135,9 +135,9 @@ public class KafkaSource implements Source {
addValidator(filterPattern, kafkaReader);
result.add(kafkaReader);
}
- GlobalMetrics.incSourceSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
} else {
- GlobalMetrics.incSourceFailCount(metricTagName);
+ GLOBAL_METRICS.incSourceFailCount(metricTagName);
}
return result;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 215d599f6..9d5adf674 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.slf4j.Logger;
@@ -31,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
@@ -72,7 +72,7 @@ public class TextFileSource implements Source {
result.add(textFileReader);
}
// increment the count of successful sources
- GlobalMetrics.incSourceSuccessCount(metricTagName);
+ GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
return result;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 5e519d731..8c7ffdd9c 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -30,7 +30,6 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.SnapshotModeConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
@@ -51,6 +50,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
@@ -113,7 +113,7 @@ public class BinlogReader extends AbstractReader {
@Override
public Message read() {
if (!binlogMessagesQueue.isEmpty()) {
- GlobalMetrics.incReadNum(metricTagName);
+ GLOBAL_METRICS.incReadNum(metricTagName);
return getBinlogMessage();
} else {
return null;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 18bd19362..70658c31a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -24,7 +24,6 @@ import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -44,6 +43,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_OFFSET;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
@@ -122,7 +122,7 @@ public class KafkaReader<K, V> extends AbstractReader {
"partition:" + record.partition()
+ ", value:" + new String(recordValue) + ",
offset:" + record.offset());
// control speed
- GlobalMetrics.incReadNum(metricTagName);
+ GLOBAL_METRICS.incReadNum(metricTagName);
// commit succeed,then record current offset
snapshot = record.partition() +
JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
DefaultMessage message = new DefaultMessage(recordValue,
headerMap);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 880231cb1..672100bad 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -24,7 +24,6 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentDbUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -42,6 +41,7 @@ import static java.sql.Types.BINARY;
import static java.sql.Types.BLOB;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
/**
* Read data from database by SQL
@@ -118,14 +118,14 @@ public class SqlReader extends AbstractReader {
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis());
- GlobalMetrics.incReadNum(metricTagName);
+ GLOBAL_METRICS.incReadNum(metricTagName);
return generateMessage(lineColumns);
} else {
finished = true;
}
} catch (Exception ex) {
LOGGER.error("error while reading data", ex);
- GlobalMetrics.incReadFailedNum(metricTagName);
+ GLOBAL_METRICS.incReadFailedNum(metricTagName);
throw new RuntimeException(ex);
}
return null;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 312dd281e..a620edf0f 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -24,7 +24,6 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.except.FileException;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -39,6 +38,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
@@ -89,7 +89,7 @@ public class TextFileReader extends AbstractReader {
if (validateMessage(message)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis());
- GlobalMetrics.incReadNum(metricTagName);
+ GLOBAL_METRICS.incReadNum(metricTagName);
return new
DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
}
}
@@ -195,6 +195,6 @@ public class TextFileReader extends AbstractReader {
}
AgentUtils.finallyClose(stream);
LOGGER.info("destroy reader with read {} num {}",
- metricTagName, GlobalMetrics.getReadNum(metricTagName));
+ metricTagName, GLOBAL_METRICS.getReadNum(metricTagName));
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index 9cf65c80c..842c73910 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_FETCH_CENTER_INTERVAL_SECONDS;
-import static
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_ENABLE;
public class MiniAgent {
@@ -46,7 +45,6 @@ public class MiniAgent {
public MiniAgent() throws Exception {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
conf.setInt(AGENT_FETCH_CENTER_INTERVAL_SECONDS, 1);
- conf.setBoolean(PROMETHEUS_ENABLE, true);
manager = new AgentManager();
TaskPositionManager taskPositionManager =
PowerMockito.mock(TaskPositionManager.class);
HeartbeatManager heartbeatManager =
PowerMockito.mock(HeartbeatManager.class);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
index 2fa69a7ed..ce5a2a1b0 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
@@ -17,9 +17,11 @@
package org.apache.inlong.agent.plugin.metrics;
+import org.apache.inlong.agent.metrics.global.GlobalMetrics;
import org.junit.Test;
import static junit.framework.TestCase.assertEquals;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
public class GlobalMetricsTest {
@@ -33,33 +35,36 @@ public class GlobalMetricsTest {
public void testPluginMetric() {
String tag1 = groupId1 + "_" + streamId;
String tag2 = groupId2 + "_" + streamId;
- GlobalMetrics.incReadNum(tag1);
- assertEquals(GlobalMetrics.getReadNum(tag1), 1);
- assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 0);
- GlobalMetrics.incSendSuccessNum(tag2, 10);
- assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 10);
- GlobalMetrics.incSendSuccessNum(tag2);
- assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 11);
+ GlobalMetrics globalMetrics = GLOBAL_METRICS;
+ globalMetrics.incReadNum(tag1);
+ assertEquals(globalMetrics.getReadNum(tag1), 1);
+ assertEquals(globalMetrics.getSendSuccessNum(tag2), 0);
+ globalMetrics.incSendSuccessNum(tag2, 10);
+ assertEquals(globalMetrics.getSendSuccessNum(tag2), 10);
+ globalMetrics.incSendSuccessNum(tag2);
+ assertEquals(globalMetrics.getSendSuccessNum(tag2), 11);
}
@Test
public void testSinkMetric() {
String tag = sinkTag + "_" + groupId1 + "_" + streamId;
- GlobalMetrics.incSinkFailCount(tag);
- assertEquals(GlobalMetrics.getSinkFailCount(tag), 1);
+ GlobalMetrics globalMetrics = GLOBAL_METRICS;
+ globalMetrics.incSinkFailCount(tag);
+ assertEquals(globalMetrics.getSinkFailCount(tag), 1);
}
@Test
public void testSourceMetric() {
String tag1 = sourceTag + "_" + groupId1 + "_" + streamId;
- GlobalMetrics.incSourceSuccessCount(tag1);
- GlobalMetrics.incSourceSuccessCount(tag1);
- assertEquals(GlobalMetrics.getSourceSuccessCount(tag1), 2);
+ GlobalMetrics globalMetrics = GLOBAL_METRICS;
+ globalMetrics.incSourceSuccessCount(tag1);
+ globalMetrics.incSourceSuccessCount(tag1);
+ assertEquals(globalMetrics.getSourceSuccessCount(tag1), 2);
String tag2 = sourceTag + "_" + groupId2 + "_" + streamId;
- assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 0);
- GlobalMetrics.incSourceSuccessCount(tag2);
- assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 1);
+ assertEquals(globalMetrics.getSourceSuccessCount(tag2), 0);
+ globalMetrics.incSourceSuccessCount(tag2);
+ assertEquals(globalMetrics.getSourceSuccessCount(tag2), 1);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 0ce57bccc..a9c8a9ecb 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -22,13 +22,13 @@ import
org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
@@ -54,10 +54,10 @@ public class MockSink implements Sink {
number.incrementAndGet();
taskPositionManager.updateSinkPosition(jobInstanceId,
sourceFileName, 1);
// increment the count of successful sinks
- GlobalMetrics.incSinkSuccessCount(tagName);
+ GLOBAL_METRICS.incSinkSuccessCount(tagName);
} else {
// increment the count of failed sinks
- GlobalMetrics.incSinkFailCount(tagName);
+ GLOBAL_METRICS.incSinkFailCount(tagName);
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 18545403c..6d10f3048 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -22,17 +22,14 @@ import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
-import org.apache.inlong.agent.plugin.utils.TestUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +53,6 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTE
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.script.*",
"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
"org.w3c.*"})
@PrepareForTest({MetricRegister.class})
@@ -101,7 +97,6 @@ public class TestTextFileReader {
@Test
public void testTextFileReader() throws Exception {
- TestUtils.mockMetricRegister();
URI uri = getClass().getClassLoader().getResource("test").toURI();
JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
String mainPath = Paths.get(uri).toString();
@@ -128,7 +123,6 @@ public class TestTextFileReader {
@Test
public void testTextSeekReader() throws Exception {
- TestUtils.mockMetricRegister();
Path localPath = Paths.get(testDir.toString(), "test.txt");
LOGGER.info("start to create {}", localPath);
List<String> beforeList = new ArrayList<>();
@@ -153,7 +147,6 @@ public class TestTextFileReader {
@Test
public void testTextTailTimeout() throws Exception {
- TestUtils.mockMetricRegister();
JobProfile jobProfile = new JobProfile();
jobProfile.setInt(JOB_FILE_MAX_WAIT, 1);
jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
diff --git a/inlong-agent/bin/agent-env.sh b/inlong-agent/bin/agent-env.sh
index 339c2485f..abcf20447 100755
--- a/inlong-agent/bin/agent-env.sh
+++ b/inlong-agent/bin/agent-env.sh
@@ -51,7 +51,7 @@ CONFIG_DIR=${BASE_DIR}"/conf/"
JAR_LIBS=${BASE_DIR}"/lib/*"
CLASSPATH=${CONFIG_DIR}:${JAR_LIBS}
-JMX_ENABLED=$(grep -c "agent.prometheus.enable=false"
$BASE_DIR/conf/agent.properties)
+JMX_ENABLED=$(grep -c
"agent.domainListeners=org.apache.inlong.agent.metrics.AgentJmxMetricListener"
$BASE_DIR/conf/agent.properties)
if [[ $JMX_ENABLED == 1 ]]; then
export AGENT_ARGS="$AGENT_JVM_ARGS $AGENT_RMI_ARGS -cp $CLASSPATH
-Dagent.home=$BASE_DIR"
else
diff --git a/inlong-agent/conf/agent.properties
b/inlong-agent/conf/agent.properties
index 92e036ca4..f910751d3 100755
--- a/inlong-agent/conf/agent.properties
+++ b/inlong-agent/conf/agent.properties
@@ -35,8 +35,6 @@ agent.http.enable=true
# http default port
agent.http.port=8008
-
-
######################
# fetch center
######################
@@ -44,7 +42,6 @@ agent.http.port=8008
agent.fetchCenter.interval=5
agent.fetcher.classname=org.apache.inlong.agent.plugin.fetcher.ManagerFetcher
-
#######################
# common config
#######################
@@ -55,7 +52,6 @@ agent.local.uuid=
agent.local.uuid.open=false
agent.enable.oom.exit=false
-
###########################
# job/job manager config
###########################
@@ -74,7 +70,6 @@ job.finish.checkInterval=6
# the amount of jobs agent can support
job.number.limit=15
-
############################
# task/task manager config
############################
@@ -99,8 +94,6 @@ channel.memory.capacity=5000
# snapshotreport scheduled cron
agent.scheduled.snapshotreport=0 0/1 * * * ? *
-
-
############################
# manager config
############################
@@ -109,17 +102,16 @@ agent.manager.vip.http.port=8083
agent.manager.auth.secretId=
agent.manager.auth.secretKey=
-
-
############################
-# prometheus config
+# metric config
+# Currently JMX and Prometheus modes are supported. In fact, JMX needs to
specify the
+# listener as org.apache.inlong.agent.metrics.AgentJmxMetricListener, and the
listener
+# of Prometheus is
org.apache.inlong.agent.metrics.AgentPrometheusMetricListener
############################
-# whether to enable prometheus
-agent.prometheus.enable=true
-# prometheus exporter server default port
+agent.domainListeners=org.apache.inlong.agent.metrics.AgentPrometheusMetricListener
+# If using prometheus, exporter server default port
agent.prometheus.exporter.port=9080
-
############################
# audit config
############################
@@ -127,5 +119,3 @@ agent.prometheus.exporter.port=9080
agent.audit.enable=true
# audit proxy address
audit.proxys=127.0.0.1:10081
-
-