This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new e3242e1d8 [INLONG-6115][Agent] Solve Prometheus listener error and add
unit tests (#6112)
e3242e1d8 is described below
commit e3242e1d8fb2a349c2504c16062f849882b25c09
Author: Keylchen <[email protected]>
AuthorDate: Mon Oct 10 19:53:56 2022 +0800
[INLONG-6115][Agent] Solve Prometheus listener error and add unit tests
(#6112)
---
.../metrics/AgentPrometheusMetricListener.java | 17 +-
.../agent/metrics/TestPrometheusListener.java | 215 +++++++++++++++++++++
2 files changed, 224 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
index b1a7911ae..6e809e7eb 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics;
import io.prometheus.client.Collector;
import io.prometheus.client.CounterMetricFamily;
import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.common.metric.MetricItemValue;
import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +65,8 @@ import static
org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN;
*/
public class AgentPrometheusMetricListener extends Collector implements
MetricListener {
- private static final Logger LOGGER =
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
public static final String DEFAULT_DIMENSION_LABEL = "dimension";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
protected HTTPServer httpServer;
private AgentMetricItem metricItem;
private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
@@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends
Collector implements MetricLi
} catch (IOException e) {
LOGGER.error("exception while register agent prometheus http
server,error:{}", e.getMessage());
}
- this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
-
}
@Override
public List<MetricFamilySamples> collect() {
+ DefaultExports.initialize();
// total
- CounterMetricFamily totalCounter = new
CounterMetricFamily("group=total",
- "The metrics of agent node.",
- Arrays.asList("dimension"));
+ CounterMetricFamily totalCounter = new CounterMetricFamily("total",
"metrics_of_agent_node_total",
+ Arrays.asList(DEFAULT_DIMENSION_LABEL));
totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT),
metricItem.jobRunningCount.get());
totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT),
metricItem.jobFatalCount.get());
totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT),
metricItem.taskRunningCount.get());
@@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends
Collector implements MetricLi
mfs.add(totalCounter);
// id dimension
- CounterMetricFamily idCounter = new CounterMetricFamily("group=id",
- "The metrics of agent dimensions.", this.dimensionKeys);
+ List<String> dimensionIdKeys = new ArrayList<>();
+ dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL);
+ dimensionIdKeys.addAll(this.dimensionKeys);
+ CounterMetricFamily idCounter = new CounterMetricFamily("id",
"metrics_of_agent_dimensions", dimensionIdKeys);
for (Entry<String, MetricItemValue> entry :
this.dimensionMetricValueMap.entrySet()) {
MetricItemValue itemValue = entry.getValue();
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
new file mode 100644
index 000000000..298f87098
--- /dev/null
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.common.metric.MetricListenerRunnable;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.metric.MetricValue;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT;
+
+/**
+ * use to test prometheus listener.
+ */
+public class TestPrometheusListener {
+
+ protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestPrometheusListener.class);
+ private static final Map<String, AtomicLong> metricValueMap = new
ConcurrentHashMap<>();
+ protected static AgentMetricItemSet metricItemSet;
+ protected static Map<String, String> dimensions;
+ private static AgentMetricItem metricItem = new AgentMetricItem();
+ private final Map<String, MetricItemValue> dimensionMetricValueMap = new
ConcurrentHashMap<>();
+ private final List<String> dimensionKeys = new ArrayList<>();
+
+ @BeforeClass
+ public static void setup() {
+ dimensions = new HashMap<>();
+ dimensions.put(KEY_PLUGIN_ID,
TestPrometheusListener.class.getSimpleName());
+ String groupId1 = "groupId_test1";
+ dimensions.put(KEY_INLONG_GROUP_ID, groupId1);
+ String streamId = "streamId";
+ dimensions.put(KEY_INLONG_STREAM_ID, streamId);
+ String metricName = String.join("-",
TestPrometheusListener.class.getSimpleName(),
+ String.valueOf(METRIC_INDEX.incrementAndGet()));
+ metricItemSet = new AgentMetricItemSet(metricName);
+ MetricRegister.register(metricItemSet);
+ Assert.assertEquals(metricItemSet.getName(),
"TestPrometheusListener-1");
+ metricValueMap.put(M_JOB_RUNNING_COUNT, metricItem.jobRunningCount);
+ metricValueMap.put(M_JOB_FATAL_COUNT, metricItem.jobFatalCount);
+
+ metricValueMap.put(M_TASK_RUNNING_COUNT, metricItem.taskRunningCount);
+ metricValueMap.put(M_TASK_RETRYING_COUNT,
metricItem.taskRetryingCount);
+ metricValueMap.put(M_TASK_FATAL_COUNT, metricItem.taskFatalCount);
+
+ metricValueMap.put(M_SINK_SUCCESS_COUNT, metricItem.sinkSuccessCount);
+ metricValueMap.put(M_SINK_FAIL_COUNT, metricItem.sinkFailCount);
+
+ metricValueMap.put(M_SOURCE_SUCCESS_COUNT,
metricItem.sourceSuccessCount);
+ metricValueMap.put(M_SOURCE_FAIL_COUNT, metricItem.sourceFailCount);
+
+ metricValueMap.put(M_PLUGIN_READ_COUNT, metricItem.pluginReadCount);
+ metricValueMap.put(M_PLUGIN_SEND_COUNT, metricItem.pluginSendCount);
+ metricValueMap.put(M_PLUGIN_READ_FAIL_COUNT,
metricItem.pluginReadFailCount);
+ metricValueMap.put(M_PLUGIN_SEND_FAIL_COUNT,
metricItem.pluginSendFailCount);
+ metricValueMap.put(M_PLUGIN_READ_SUCCESS_COUNT,
metricItem.pluginReadSuccessCount);
+ metricValueMap.put(M_PLUGIN_SEND_SUCCESS_COUNT,
metricItem.pluginSendSuccessCount);
+ }
+
+ @Test
+ public void testSnapshot() {
+ metricItem = metricItemSet.findMetricItem(dimensions);
+ metricItem.pluginReadFailCount.incrementAndGet();
+ metricItem.pluginReadSuccessCount.incrementAndGet();
+ // report
+ MetricListener listener = new MetricListener() {
+ @Override
+ public void snapshot(String domain, List<MetricItemValue>
itemValues) {
+ for (MetricItemValue itemValue : itemValues) {
+ String key = itemValue.getKey();
+ LOGGER.info("KEY : " + key);
+ Map<String, MetricValue> metricMap =
itemValue.getMetrics();
+ // total
+ for (Entry<String, MetricValue> entry :
itemValue.getMetrics().entrySet()) {
+ String fieldName = entry.getValue().name;
+ AtomicLong metricValue = metricValueMap.get(fieldName);
+ if (metricValue != null) {
+ long fieldValue = entry.getValue().value;
+ metricValue.addAndGet(fieldValue);
+ metricValue.addAndGet(100);
+ }
+ }
+ // dimension
+ String dimensionKey = itemValue.getKey();
+ MetricItemValue dimensionMetricValue =
dimensionMetricValueMap.get(dimensionKey);
+ if (dimensionMetricValue == null) {
+ dimensionMetricValue = new
MetricItemValue(dimensionKey,
+ new ConcurrentHashMap<String, String>(),
+ new ConcurrentHashMap<String, MetricValue>());
+ dimensionMetricValueMap.putIfAbsent(dimensionKey,
dimensionMetricValue);
+ dimensionMetricValue =
dimensionMetricValueMap.get(dimensionKey);
+
dimensionMetricValue.getDimensions().putAll(itemValue.getDimensions());
+ // add prometheus label name
+ for (Entry<String, String> entry :
itemValue.getDimensions().entrySet()) {
+ if (!dimensionKeys.contains(entry.getKey())) {
+ dimensionKeys.add(entry.getKey());
+ }
+ }
+ }
+ // count
+ for (Entry<String, MetricValue> entry :
itemValue.getMetrics().entrySet()) {
+ String fieldName = entry.getValue().name;
+ MetricValue metricValue =
dimensionMetricValue.getMetrics().get(fieldName);
+ if (metricValue == null) {
+ metricValue = MetricValue.of(fieldName,
entry.getValue().value);
+
dimensionMetricValue.getMetrics().put(metricValue.name, metricValue);
+ continue;
+ }
+ metricValue.value += entry.getValue().value;
+ }
+ }
+ }
+ };
+ Assert.assertEquals(metricItem.pluginReadSuccessCount.get(), 1);
+ Assert.assertEquals(metricItem.pluginReadFailCount.get(), 1);
+ List<MetricListener> listeners = new ArrayList<>();
+ listeners.add(listener);
+ MetricListenerRunnable runnable = new MetricListenerRunnable("Agent",
listeners);
+ runnable.run();
+
Assert.assertEquals(metricValueMap.get("pluginReadFailCount").intValue(), 101);
+ Assert.assertTrue(
+
dimensionMetricValueMap.toString().contains("{\"name\":\"pluginReadSuccessCount\",\"value\":1}"));
+ Assert.assertTrue(dimensionKeys.contains("inlongGroupId"));
+ for (Entry<String, MetricItemValue> entry :
this.dimensionMetricValueMap.entrySet()) {
+ MetricItemValue itemValue = entry.getValue();
+ // JOB
+ addMetric(M_JOB_RUNNING_COUNT, itemValue);
+ addMetric(M_JOB_FATAL_COUNT, itemValue);
+ // TASK
+ addMetric(M_TASK_RUNNING_COUNT, itemValue);
+ addMetric(M_TASK_RETRYING_COUNT, itemValue);
+ addMetric(M_TASK_FATAL_COUNT, itemValue);
+ // SINK
+ addMetric(M_SINK_SUCCESS_COUNT, itemValue);
+ addMetric(M_SINK_FAIL_COUNT, itemValue);
+ // SOURCE
+ addMetric(M_SOURCE_SUCCESS_COUNT, itemValue);
+ addMetric(M_SOURCE_FAIL_COUNT, itemValue);
+ // PLUGIN
+ addMetric(M_PLUGIN_READ_COUNT, itemValue);
+ addMetric(M_PLUGIN_SEND_COUNT, itemValue);
+ addMetric(M_PLUGIN_READ_FAIL_COUNT, itemValue);
+ addMetric(M_PLUGIN_SEND_FAIL_COUNT, itemValue);
+ addMetric(M_PLUGIN_READ_SUCCESS_COUNT, itemValue);
+ addMetric(M_PLUGIN_SEND_SUCCESS_COUNT, itemValue);
+ }
+ List<MetricItemValue> metricItemValueList = new
ArrayList<>(dimensionMetricValueMap.values());
+ AgentPrometheusMetricListener agentPrometheusMetricListener = new
AgentPrometheusMetricListener();
+ agentPrometheusMetricListener.snapshot("Agent", metricItemValueList);
+ LOGGER.debug(agentPrometheusMetricListener.collect().toString());
+ }
+
+ private void addMetric(String defaultDimension, MetricItemValue itemValue)
{
+ List<String> labelValues = new ArrayList<>(this.dimensionKeys.size());
+ labelValues.add(defaultDimension);
+ Map<String, String> dimensions = itemValue.getDimensions();
+ for (String key : this.dimensionKeys) {
+ String labelValue = dimensions.getOrDefault(key, "-");
+ labelValues.add(labelValue);
+ }
+ long value = 0L;
+ Map<String, MetricValue> metricValueMap = itemValue.getMetrics();
+ MetricValue metricValue = metricValueMap.get(defaultDimension);
+ if (metricValue != null) {
+ value = metricValue.value;
+ }
+ LOGGER.debug("labelValues is " + labelValues + " and value is " +
value);
+ }
+}