This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a9b5a8acb [INLONG-6898][Sort] Unify metrics report model of
SortStandalone (#6947)
a9b5a8acb is described below
commit a9b5a8acb455a028e0a186d1a786fa47b4dd73d4
Author: vernedeng <[email protected]>
AuthorDate: Mon Dec 19 16:35:06 2022 +0800
[INLONG-6898][Sort] Unify metrics report model of SortStandalone (#6947)
---
.../sort/standalone/metrics/MetricItemValue.java | 73 ----------
.../sort/standalone/metrics/MetricListener.java | 39 -----
.../standalone/metrics/MetricListenerRunnable.java | 142 ------------------
.../sort/standalone/metrics/MetricObserver.java | 114 ---------------
.../metrics/TestMetricListenerRunnable.java | 161 ---------------------
.../sort/standalone/SortStandaloneApplication.java | 2 +-
.../prometheus/PrometheusMetricListener.java | 5 +-
7 files changed, 4 insertions(+), 532 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
deleted file mode 100644
index 25be6d8e3..000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.metrics;
-
-import java.util.Map;
-
-import org.apache.inlong.common.metric.MetricValue;
-
-/**
- *
- * MetricItemValue
- */
-public class MetricItemValue {
-
- private final String key;
- private final Map<String, String> dimensions;
- private final Map<String, MetricValue> metrics;
-
- /**
- * Constructor
- *
- * @param key
- * @param dimensions
- * @param metrics
- */
- public MetricItemValue(String key, Map<String, String> dimensions,
Map<String, MetricValue> metrics) {
- this.key = key;
- this.dimensions = dimensions;
- this.metrics = metrics;
- }
-
- /**
- * get key
- *
- * @return the key
- */
- public String getKey() {
- return key;
- }
-
- /**
- * get dimensions
- *
- * @return the dimensions
- */
- public Map<String, String> getDimensions() {
- return dimensions;
- }
-
- /**
- * get metrics
- *
- * @return the metrics
- */
- public Map<String, MetricValue> getMetrics() {
- return metrics;
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
deleted file mode 100644
index 7475d833b..000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.metrics;
-
-import java.util.List;
-
-/**
- *
- * MetricListener
- */
-public interface MetricListener {
-
- String KEY_METRIC_DOMAINS = "metricDomains";
- String KEY_DOMAIN_LISTENERS = "domainListeners";
- String KEY_SNAPSHOT_INTERVAL = "snapshotInterval";
-
- /**
- * snapshot
- *
- * @param domain
- * @param itemValues
- */
- public void snapshot(String domain, List<MetricItemValue> itemValues);
-}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
deleted file mode 100644
index 5708a9092..000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.metrics;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.inlong.common.metric.MetricItem;
-import org.apache.inlong.common.metric.MetricItemMBean;
-import org.apache.inlong.common.metric.MetricItemSetMBean;
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.metric.MetricUtils;
-import org.apache.inlong.common.metric.MetricValue;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.slf4j.Logger;
-
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-
-/**
- *
- * MetricListenerRunnable
- */
-public class MetricListenerRunnable implements Runnable {
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(MetricListenerRunnable.class);
-
- private String domain;
- private List<MetricListener> listenerList;
-
- /**
- * Constructor
- *
- * @param domain
- * @param listenerList
- */
- public MetricListenerRunnable(String domain, List<MetricListener>
listenerList) {
- this.domain = domain;
- this.listenerList = listenerList;
- }
-
- /**
- * run
- */
- @Override
- public void run() {
- LOG.info("begin to snapshot metric:{}", domain);
- try {
- List<MetricItemValue> itemValues = this.getItemValues();
- LOG.info("snapshot metric:{},size:{},content:{}", domain,
itemValues.size(),
- new ObjectMapper().writeValueAsString(itemValues));
- this.listenerList.forEach((item) -> {
- item.snapshot(domain, itemValues);
- });
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- }
- LOG.info("end to snapshot metric:{}", domain);
- }
-
- /**
- * getItemValues
- *
- * @return MetricItemValue List
- * @throws InstanceNotFoundException
- * @throws AttributeNotFoundException
- * @throws ReflectionException
- * @throws MBeanException
- * @throws MalformedObjectNameException
- * @throws ClassNotFoundException
- */
- @SuppressWarnings("unchecked")
- public List<MetricItemValue> getItemValues() throws
InstanceNotFoundException, AttributeNotFoundException,
- ReflectionException, MBeanException, MalformedObjectNameException,
ClassNotFoundException {
- StringBuilder beanName = new StringBuilder();
-
beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
-
.append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
- .append(MetricItemMBean.PROPERTY_SEPARATOR)
- .append("*");
- ObjectName objName = new ObjectName(beanName.toString());
- final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
- LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
- List<MetricItemValue> itemValues = new ArrayList<>();
- for (ObjectInstance mbean : mbeans) {
- String className = mbean.getClassName();
- Class<?> clazz = ClassUtils.getClass(className);
- if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) {
- ObjectName metricObjectName = mbean.getObjectName();
- String dimensionsKey = (String)
mbs.getAttribute(metricObjectName,
- MetricItemMBean.ATTRIBUTE_KEY);
- Map<String, String> dimensions = (Map<String, String>) mbs
- .getAttribute(metricObjectName,
MetricItemMBean.ATTRIBUTE_DIMENSIONS);
- Map<String, MetricValue> metrics = (Map<String, MetricValue>)
mbs
- .invoke(metricObjectName,
MetricItemMBean.METHOD_SNAPSHOT, null, null);
- MetricItemValue itemValue = new MetricItemValue(dimensionsKey,
dimensions, metrics);
- LOG.info("MetricItemMBean get itemValue:{}", itemValue);
- itemValues.add(itemValue);
- } else if (ClassUtils.isAssignable(clazz,
MetricItemSetMBean.class)) {
- ObjectName metricObjectName = mbean.getObjectName();
- List<MetricItem> items = (List<MetricItem>)
mbs.invoke(metricObjectName,
- MetricItemMBean.METHOD_SNAPSHOT, null, null);
- for (MetricItem item : items) {
- String dimensionsKey = item.getDimensionsKey();
- Map<String, String> dimensions = item.getDimensions();
- Map<String, MetricValue> metrics = item.snapshot();
- MetricItemValue itemValue = new
MetricItemValue(dimensionsKey, dimensions, metrics);
- LOG.info("MetricItemSetMBean get itemValue:{}", itemValue);
- itemValues.add(itemValue);
- }
- }
- }
- return itemValues;
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
deleted file mode 100644
index edc438be1..000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Context;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.slf4j.Logger;
-
-/**
- *
- * MetricObserver
- */
-public class MetricObserver {
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(MetricObserver.class);
- private static final AtomicBoolean isInited = new AtomicBoolean(false);
- private static ScheduledExecutorService statExecutor =
Executors.newScheduledThreadPool(5);
-
- /**
- * init
- *
- * @param commonProperties
- */
- public static void init(Map<String, String> commonProperties) {
- if (!isInited.compareAndSet(false, true)) {
- return;
- }
- // init
- Context context = new Context(commonProperties);
- // get domain name list
- String metricDomains =
context.getString(MetricListener.KEY_METRIC_DOMAINS);
- if (StringUtils.isBlank(metricDomains)) {
- return;
- }
- // split domain name
- String[] domains = metricDomains.split("\\s+");
- for (String domain : domains) {
- // get domain parameters
- Context domainContext = new Context(
- context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS
+ "." + domain + "."));
- List<MetricListener> listenerList = parseDomain(domain,
domainContext);
- // no listener
- if (listenerList == null || listenerList.size() <= 0) {
- continue;
- }
- // get snapshot interval
- long snapshotInterval =
domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L);
- LOG.info("begin to register
domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList,
- snapshotInterval);
- statExecutor.scheduleWithFixedDelay(new
MetricListenerRunnable(domain, listenerList), snapshotInterval,
- snapshotInterval, TimeUnit.MILLISECONDS);
- }
-
- }
-
- /**
- * parseDomain
- *
- * @param domain
- * @param context
- * @return
- */
- private static List<MetricListener> parseDomain(String domain, Context
domainContext) {
- String listeners =
domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS);
- if (StringUtils.isBlank(listeners)) {
- return null;
- }
- String[] listenerTypes = listeners.split("\\s+");
- List<MetricListener> listenerList = new ArrayList<>();
- for (String listenerType : listenerTypes) {
- // new listener object
- try {
- Class<?> listenerClass = ClassUtils.getClass(listenerType);
- Object listenerObject =
listenerClass.getDeclaredConstructor().newInstance();
- if (listenerObject == null || !(listenerObject instanceof
MetricListener)) {
- LOG.error("{} is not instance of MetricListener.",
listenerType);
- continue;
- }
- final MetricListener listener = (MetricListener)
listenerObject;
- listenerList.add(listener);
- } catch (Throwable t) {
- LOG.error("Fail to init MetricListener:{},error:{}",
- listenerType, t.getMessage());
- continue;
- }
- }
- return listenerList;
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
deleted file mode 100644
index 8ecc39011..000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.metrics;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.metric.MetricUtils;
-import org.apache.inlong.common.metric.MetricValue;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- *
- * TestMetricItemSetMBean
- */
-public class TestMetricListenerRunnable {
-
- public static final String CLUSTER_ID = "inlong5th_sz";
- public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001";
- public static final String CONTAINER_IP = "127.0.0.1";
- private static final String SOURCE_ID = "agent-source";
- private static final String SOURCE_DATA_ID = "12069";
- private static final String INLONG_GROUP_ID1 = "03a00000026";
- private static final String INLONG_GROUP_ID2 = "03a00000126";
- private static final String INLONG_STREAM_ID = "";
- private static final String SINK_ID = "inlong5th-pulsar-sz";
- private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
- private static SortMetricItemSet itemSet;
- private static Map<String, String> dimSource;
- private static Map<String, String> dimSink;
- private static String keySource1;
- private static String keySource2;
- private static String keySink1;
- private static String keySink2;
-
- /**
- * setup
- */
- @BeforeClass
- public static void setup() {
- itemSet = new SortMetricItemSet(CLUSTER_ID);
- MetricRegister.register(itemSet);
- // prepare
- SortMetricItem itemSource = new SortMetricItem();
- itemSource.clusterId = CLUSTER_ID;
- itemSource.sourceId = SOURCE_ID;
- itemSource.sourceDataId = SOURCE_DATA_ID;
- itemSource.inlongGroupId = INLONG_GROUP_ID1;
- itemSource.inlongStreamId = INLONG_STREAM_ID;
- dimSource = itemSource.getDimensions();
- //
- SortMetricItem itemSink = new SortMetricItem();
- itemSink.clusterId = CLUSTER_ID;
- itemSink.sinkId = SINK_ID;
- itemSink.sinkDataId = SINK_DATA_ID;
- itemSink.inlongGroupId = INLONG_GROUP_ID1;
- itemSink.inlongStreamId = INLONG_STREAM_ID;
- dimSink = itemSink.getDimensions();
- }
-
- /**
- * setdown
- */
- @AfterClass
- public static void setdown() {
- MetricRegister.unregister(itemSet);
- }
-
- /**
- * testResult
- *
- * @throws Exception
- */
- @Test
- public void testResult() throws Exception {
- // increase source
- SortMetricItem item = null;
- item = itemSet.findMetricItem(dimSource);
- item.readSuccessCount.incrementAndGet();
- item.readSuccessSize.addAndGet(100);
- keySource1 = MetricUtils.getDimensionsKey(dimSource);
- //
- dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
- item = itemSet.findMetricItem(dimSource);
- item.readFailCount.addAndGet(20);
- item.readFailSize.addAndGet(2000);
- keySource2 = MetricUtils.getDimensionsKey(dimSource);
- // increase sink
- item = itemSet.findMetricItem(dimSink);
- item.sendCount.incrementAndGet();
- item.sendSize.addAndGet(100);
- item.sendSuccessCount.incrementAndGet();
- item.sendSuccessSize.addAndGet(100);
- keySink1 = MetricUtils.getDimensionsKey(dimSink);
- //
- dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
- item = itemSet.findMetricItem(dimSink);
- item.sendCount.addAndGet(20);
- item.sendSize.addAndGet(2000);
- item.sendFailCount.addAndGet(20);
- item.sendFailSize.addAndGet(2000);
- keySink2 = MetricUtils.getDimensionsKey(dimSink);
- // report
- MetricListener listener = new MetricListener() {
-
- @Override
- public void snapshot(String domain, List<MetricItemValue>
itemValues) {
- assertEquals("Sort", domain);
- for (MetricItemValue itemValue : itemValues) {
- String key = itemValue.getKey();
- Map<String, MetricValue> metricMap =
itemValue.getMetrics();
- if (keySource1.equals(itemValue.getKey())) {
- assertEquals(1,
metricMap.get("readSuccessCount").value);
- assertEquals(100,
metricMap.get("readSuccessSize").value);
- } else if (keySource2.equals(key)) {
- assertEquals(20, metricMap.get("readFailCount").value);
- assertEquals(2000,
metricMap.get("readFailSize").value);
- } else if (keySink1.equals(key)) {
- assertEquals(1, metricMap.get("sendCount").value);
- assertEquals(100, metricMap.get("sendSize").value);
- assertEquals(1,
metricMap.get("sendSuccessCount").value);
- assertEquals(100,
metricMap.get("sendSuccessSize").value);
- } else if (keySink2.equals(key)) {
- assertEquals(20, metricMap.get("sendCount").value);
- assertEquals(2000, metricMap.get("sendSize").value);
- assertEquals(20, metricMap.get("sendFailCount").value);
- assertEquals(2000,
metricMap.get("sendFailSize").value);
- } else {
- System.out.println("bad MetricItem:" + key);
- }
- }
- }
- };
- List<MetricListener> listeners = new ArrayList<>();
- listeners.add(listener);
- MetricListenerRunnable runnable = new MetricListenerRunnable("Sort",
listeners);
- List<MetricItemValue> itemValues = runnable.getItemValues();
- runnable.run();
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index 4faac77a9..362a192a3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sort.standalone;
import org.apache.flume.node.Application;
+import org.apache.inlong.common.metric.MetricObserver;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.metrics.MetricObserver;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
index 887a6ea74..b760df206 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -47,10 +47,11 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
import org.apache.inlong.common.metric.MetricValue;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.metrics.MetricItemValue;
-import org.apache.inlong.sort.standalone.metrics.MetricListener;
+
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;