This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 45b6e0d53f [INLONG-8766][SDK] SortSDK create consumer in parallel 
(#8784)
45b6e0d53f is described below

commit 45b6e0d53f947b87569ae50f01b406e8fc39c0c8
Author: vernedeng <[email protected]>
AuthorDate: Tue Aug 29 16:15:02 2023 +0800

    [INLONG-8766][SDK] SortSDK create consumer in parallel (#8784)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |  10 +
 .../inlong/sdk/sort/api/ConfigConstants.java       |   6 +
 .../sdk/sort/api/InlongTopicManagerFactory.java    |   9 +
 .../inlong/sdk/sort/api/SortClientConfig.java      |  36 ++
 .../inlong/sdk/sort/api/SortClientFactory.java     |   9 +
 .../apache/inlong/sdk/sort/api/TopicFetcher.java   |  17 +
 .../apache/inlong/sdk/sort/api/TopicManager.java   |  21 +
 .../sdk/sort/impl/QueryConsumeConfigImpl.java      |  43 +-
 .../inlong/sdk/sort/impl/SortClientImplV2.java     | 153 +++++++
 .../sdk/sort/manager/InlongMultiTopicManager.java  |   3 +
 .../sdk/sort/manager/InlongSingleTopicManager.java |   2 +
 .../sdk/sort/manager/InlongTopicManager.java       | 489 +++++++++++++++++++++
 .../inlong/sdk/sort/metrics/SortSdkMetricItem.java |  13 +-
 .../metrics/SortSdkPrometheusMetricListener.java   |  15 +-
 14 files changed, 806 insertions(+), 20 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index 4420fcfc72..fefb4100f9 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -142,6 +142,16 @@ public abstract class ClientContext implements Cleanable {
         metricItem.requestManagerParamErrorCount.incrementAndGet();
     }
 
+    public void addRequestManagerEmptyError() {
+        SortSdkMetricItem metricItem = this.getMetricItem(null, -1);
+        metricItem.reqeustManagerEmptyCount.incrementAndGet();
+    }
+
+    public void addRequestManagerTopicsChangeOutOfThreshold() {
+        SortSdkMetricItem metricItem = this.getMetricItem(null, -1);
+        metricItem.requestManagerTopicsChangeOutOfThreshold.incrementAndGet();
+    }
+
     private SortSdkMetricItem getMetricItem(InLongTopic topic, int 
partitionId) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
index 32136d470b..52be3a1cb4 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
@@ -54,4 +54,10 @@ public class ConfigConstants {
     public static final String IS_TOPIC_STATICS_ENABLED = 
"isTopicStaticsEnabled";
     public static final String IS_PARTITION_STATICS_ENABLED = 
"isPartitionStaticsEnabled";
 
+    public static final String MAX_OFFLINE_TOPIC = "maxOfflineTopic";
+
+    public static final String START_OFFLINE_CHECK_THRESHOLD = 
"startOfflineCheckThreshold";
+
+    public static final String THREAD_POOL_SIZE = "threadPoolSize";
+
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
index 946e926e5c..eb6e9710af 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.sort.api;
 import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType;
 import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager;
 import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
+import org.apache.inlong.sdk.sort.manager.InlongTopicManager;
 
 /**
  * Inlong topic manager factory.
@@ -27,6 +28,14 @@ import 
org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
  */
 public class InlongTopicManagerFactory {
 
+    /**
+     * Since 1.9.0
+     */
+    public static TopicManager createInlongTopicManagerV2(
+            ClientContext context,
+            QueryConsumeConfig queryConsumeConfig) {
+        return new InlongTopicManager(context, queryConsumeConfig);
+    }
     public static TopicManager createInLongTopicManager(
             TopicType type,
             ClientContext context,
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index 895038490f..b9776265b4 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -72,6 +72,11 @@ public class SortClientConfig implements Serializable {
     private boolean topicStaticsEnabled = true;
     private boolean partitionStaticsEnabled = true;
 
+    private int startOfflineTopicCheckThreshold = 50;
+    private int maxOfflineTopicPercent = 5;
+
+    private int threadPoolSize = 50;
+
     public SortClientConfig(
             String sortTaskId,
             String sortClusterName,
@@ -379,6 +384,30 @@ public class SortClientConfig implements Serializable {
         return partitionStaticsEnabled;
     }
 
+    public int getMaxOfflineTopicPercent() {
+        return maxOfflineTopicPercent;
+    }
+
+    public void setMaxOfflineTopicPercent(int maxOfflineTopicPercent) {
+        this.maxOfflineTopicPercent = maxOfflineTopicPercent;
+    }
+
+    public int getStartOfflineTopicCheckThreshold() {
+        return startOfflineTopicCheckThreshold;
+    }
+
+    public void setStartOfflineTopicCheckThreshold(int 
startOfflineTopicCheckThreshold) {
+        this.startOfflineTopicCheckThreshold = startOfflineTopicCheckThreshold;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    public void setThreadPoolSize(int threadPoolSize) {
+        this.threadPoolSize = threadPoolSize;
+    }
+
     /**
      * ConsumeStrategy
      */
@@ -469,6 +498,13 @@ public class SortClientConfig implements Serializable {
                 Boolean.TRUE.toString());
         this.partitionStaticsEnabled = 
StringUtils.equalsIgnoreCase(strPartitionStaticsEnabled,
                 Boolean.TRUE.toString());
+
+        this.maxOfflineTopicPercent =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_OFFLINE_TOPIC), 
maxOfflineTopicPercent);
+        this.startOfflineTopicCheckThreshold =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.START_OFFLINE_CHECK_THRESHOLD),
+                        startOfflineTopicCheckThreshold);
+        this.threadPoolSize = 
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.THREAD_POOL_SIZE), 
threadPoolSize);
     }
 
     public List<InLongTopic> getConsumerSubset(List<InLongTopic> totalTopics) {
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
index ba18f621e7..86a119751b 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.sort.api;
 
 import org.apache.inlong.sdk.sort.impl.SortClientImpl;
+import org.apache.inlong.sdk.sort.impl.SortClientImplV2;
 
 /**
  * Factory of {@link SortClient}
@@ -36,4 +37,12 @@ public class SortClientFactory {
     public static SortClient createSortClient(SortClientConfig config, 
QueryConsumeConfig queryConsumeConfig) {
         return new SortClientImpl(config, queryConsumeConfig);
     }
+
+    public static SortClient createSortClientV2(SortClientConfig config) {
+        return new SortClientImplV2(config);
+    }
+
+    public static SortClient createSortClientV2(SortClientConfig config, 
QueryConsumeConfig queryConsumeConfig) {
+        return new SortClientImplV2(config, queryConsumeConfig);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
index 536ed3ac45..682e678de0 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.sort.api;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface of all type of topic fetchers.
@@ -32,6 +33,14 @@ public interface TopicFetcher {
      */
     boolean init();
 
+    /**
+     * Init topic fetcher in async ways
+     * @return CompletableFuture of init results
+     */
+    default CompletableFuture<Boolean> initAsync() {
+        return CompletableFuture.supplyAsync(this::init);
+    }
+
     /**
      * Ack message by the given msgOffset.
      * @param msgOffset Offset of message.
@@ -61,6 +70,14 @@ public interface TopicFetcher {
      */
     boolean close();
 
+    /**
+     * Close topic fetcher in async ways
+     * @return
+     */
+    default CompletableFuture<Boolean> closeAsync() {
+        return CompletableFuture.supplyAsync(this::close);
+    }
+
     /**
      * Get if the fetcher is closed or not.
      * @return Closed or not.
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
index 0759c925f6..8f76142862 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicManager.java
@@ -50,6 +50,15 @@ public abstract class TopicManager implements Cleanable {
      */
     public abstract TopicFetcher removeTopic(InLongTopic topic, boolean 
closeFetcher);
 
+    /**
+     * Remove topic and return the fetcher that has maintained this topic.
+     * @param topicKey Topic key to be removed.
+     * @return The fetcher that has maintained this topic.
+     */
+    public TopicFetcher removeTopic(String topicKey) {
+        return null;
+    }
+
     /**
      * Get the specified fetcher by the given fetch key.
      * @param fetchKey Unique fetch key.
@@ -78,4 +87,16 @@ public abstract class TopicManager implements Cleanable {
      * Close manager.
      */
     public abstract void close();
+
+    /**
+     * Restart
+     */
+    public void restartAssigned() {
+
+    }
+
+    public void stopAssigned() {
+
+    }
+
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 7e3f1e3d44..32c74013ba 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -53,10 +53,12 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
     private final Logger logger = 
LoggerFactory.getLogger(QueryConsumeConfigImpl.class);
     private final CloseableHttpClient httpClient = HttpClients.createDefault();
 
+    private final ObjectMapper mapper = new ObjectMapper();
+
     private ClientContext clientContext;
     private String md5 = "";
 
-    private Map<String, List<InLongTopic>> subscribedTopic = new HashMap<>();
+    private List<InLongTopic> subscribedTopic = new ArrayList<>();
 
     public QueryConsumeConfigImpl(ClientContext clientContext) {
         this.clientContext = clientContext;
@@ -88,7 +90,7 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
             String result = EntityUtils.toString(entity);
             logger.debug("response String result:{}", result);
             try {
-                managerResponse = new ObjectMapper().readValue(result, 
SortSourceConfigResponse.class);
+                managerResponse = mapper.readValue(result, 
SortSourceConfigResponse.class);
                 return managerResponse;
             } catch (Exception e) {
                 logger.error("parse json to ManagerResponse error:{}", 
e.getMessage(), e);
@@ -159,7 +161,7 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
                 break;
             default:
                 logger.error("return code error:{},request:{},response:{}",
-                        respCodeValue, getUrl, new 
ObjectMapper().writeValueAsString(response));
+                        respCodeValue, getUrl, 
mapper.writeValueAsString(response));
                 clientContext.addRequestManagerCommonError();
                 return true;
         }
@@ -168,12 +170,10 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
 
     private void updateSortTaskConf(SortSourceConfigResponse response) {
         CacheZoneConfig cacheZoneConfig = response.getData();
-        Map<String, List<InLongTopic>> newGroupTopicsMap = new HashMap<>();
+        List<InLongTopic> newGroupTopics = new ArrayList<>();
+
         for (Map.Entry<String, CacheZone> entry : 
cacheZoneConfig.getCacheZones().entrySet()) {
             CacheZone cacheZone = entry.getValue();
-
-            List<InLongTopic> topics = 
newGroupTopicsMap.computeIfAbsent(cacheZoneConfig.getSortTaskId(),
-                    k -> new ArrayList<>());
             CacheZoneCluster cacheZoneCluster = new 
CacheZoneCluster(cacheZone.getZoneName(),
                     cacheZone.getServiceUrl(), cacheZone.getAuthentication());
             for (Topic topicInfo : cacheZone.getTopics()) {
@@ -182,11 +182,32 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
                 topic.setTopic(topicInfo.getTopic());
                 topic.setTopicType(cacheZone.getZoneType());
                 topic.setProperties(topicInfo.getTopicProperties());
-                topics.add(topic);
+                newGroupTopics.add(topic);
             }
         }
 
-        this.subscribedTopic = newGroupTopicsMap;
+        if (CollectionUtils.isNotEmpty(newGroupTopics)) {
+            clientContext.addRequestManagerEmptyError();
+            logger.info("failed to update sort sdk config, the updated conf is 
empty");
+            return;
+        }
+
+        if (CollectionUtils.isNotEmpty(subscribedTopic) && 
!checkTopics(newGroupTopics)) {
+            clientContext.addRequestManagerTopicsChangeOutOfThreshold();
+            logger.info("failed to update sort sdk config, the updated size 
is={}, the old size={}",
+                    newGroupTopics.size(), subscribedTopic.size());
+            return;
+        }
+
+        this.subscribedTopic = newGroupTopics;
+    }
+
+    private boolean checkTopics(List<InLongTopic> newGroupTopics) {
+        if (subscribedTopic.size() < 
clientContext.getConfig().getStartOfflineTopicCheckThreshold()) {
+            return true;
+        }
+        int diff = (newGroupTopics.size() - subscribedTopic.size()) * 100 / 
subscribedTopic.size();
+        return diff < clientContext.getConfig().getMaxOfflineTopicPercent();
     }
 
     /**
@@ -198,7 +219,7 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
     @Override
     public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
         reload();
-        return new ConsumeConfig(subscribedTopic.get(sortTaskId));
+        return new ConsumeConfig(subscribedTopic);
     }
 
     @Override
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
new file mode 100644
index 0000000000..da90b64643
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl;
+
+import org.apache.inlong.sdk.sort.api.Cleanable;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.exception.NotExistException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * New version of sort client.
+ */
+public class SortClientImplV2 extends SortClient {
+
+    private final String logPrefix = "[" + 
SortClientImpl.class.getSimpleName() + "] ";
+    private final Logger logger = 
LoggerFactory.getLogger(SortClientImpl.class);
+
+    private final SortClientConfig sortClientConfig;
+
+    private final ClientContext context;
+
+    private final TopicManager inLongTopicManager;
+
+    /**
+     * SortClient Constructor
+     *
+     * @param sortClientConfig SortClientConfig
+     */
+    public SortClientImplV2(SortClientConfig sortClientConfig) {
+        try {
+            this.sortClientConfig = sortClientConfig;
+            this.context = new ClientContextImpl(this.sortClientConfig);
+
+            this.inLongTopicManager = InlongTopicManagerFactory
+                    .createInlongTopicManagerV2(context, new 
QueryConsumeConfigImpl(context));
+        } catch (Exception e) {
+            this.close();
+            throw e;
+        }
+    }
+
+    /**
+     * SortClient Constructor with user defined 
QueryConsumeConfig,MetricReporter and ManagerReportHandler
+     *
+     * @param sortClientConfig SortClientConfig
+     * @param queryConsumeConfig QueryConsumeConfig
+     */
+    public SortClientImplV2(SortClientConfig sortClientConfig, 
QueryConsumeConfig queryConsumeConfig) {
+        try {
+            this.sortClientConfig = sortClientConfig;
+            this.context = new ClientContextImpl(this.sortClientConfig);
+            queryConsumeConfig.configure(context);
+            this.inLongTopicManager = InlongTopicManagerFactory
+                    .createInlongTopicManagerV2(context, queryConsumeConfig);
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.close();
+            throw e;
+        }
+    }
+
+    /**
+     * init SortClient
+     *
+     * @return true/false
+     * @throws Throwable
+     */
+    @Override
+    public boolean init() throws Throwable {
+        logger.info(logPrefix + "init|" + sortClientConfig);
+        return true;
+    }
+
+    /**
+     * ack offset to msgKey
+     *
+     * @param msgKey String
+     * @param msgOffset String
+     * @throws Exception
+     */
+    @Override
+    public void ack(String msgKey, String msgOffset)
+            throws Exception {
+        logger.debug("ack:{} offset:{}", msgKey, msgOffset);
+        TopicFetcher topicFetcher = getFetcher(msgKey);
+        topicFetcher.ack(msgOffset);
+    }
+
+    /**
+     * close SortClient
+     *
+     * @return true/false
+     */
+    @Override
+    public boolean close() {
+        boolean cleanInLongTopicManager = doClose(inLongTopicManager);
+        boolean cleanContext = doClose(context);
+
+        logger.info(logPrefix
+
+                + "|cleanInLongTopicManager=" + cleanInLongTopicManager
+                + "|cleanContext=" + cleanContext);
+        return (cleanInLongTopicManager && cleanContext);
+    }
+
+    @Override
+    public SortClientConfig getConfig() {
+        return this.sortClientConfig;
+    }
+
+    private TopicFetcher getFetcher(String msgKey) throws NotExistException {
+        TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
+        if (topicFetcher == null) {
+            throw new NotExistException(msgKey + " not exist.");
+        }
+        return topicFetcher;
+    }
+
+    private boolean doClose(Cleanable c) {
+        try {
+            if (c != null) {
+                return c.clean();
+            }
+            return true;
+        } catch (Throwable th) {
+            logger.error(logPrefix + "clean error.", th);
+            return false;
+        }
+    }
+}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
index de30a3f308..744b38d223 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -56,7 +56,10 @@ import java.util.stream.Collectors;
  * Inlong manager that maintain the {@link 
org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
  * It is suitable to the cases that topics share the same configurations.
  * And each consumer will consume multi topic.
+ *
+ * InlongMultiTopicManager was deprecated since 1.9.0
  */
+@Deprecated
 public class InlongMultiTopicManager extends TopicManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongMultiTopicManager.class);
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index a59e81d80d..503304c77f 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -57,7 +57,9 @@ import java.util.stream.Collectors;
  * Inlong manager that maintain the single topic fetchers.
  * It is suitable to the cases that each topic has its own configurations.
  * And each consumer only consume the very one topic.
+ * InlongMultiTopicManager was deprecated since 1.9.0
  */
+@Deprecated
 public class InlongSingleTopicManager extends TopicManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongSingleTopicManager.class);
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
new file mode 100644
index 0000000000..f594126152
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the single topic fetchers.
+ * It is suitable to the cases that each topic has its own configurations.
+ * And each consumer only consume the very one topic.
+ * InlongMultiTopicManager was used since 1.9.0
+ */
+public class InlongTopicManager extends TopicManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongTopicManager.class);
+
+    private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+    private final Map<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
+    private final Map<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
+    private final Map<String, TubeConsumerCreator> tubeFactories = new 
ConcurrentHashMap<>();
+
+    protected final ForkJoinPool pool;
+
+    private volatile boolean stopAssign = false;
+
+    private Collection<InLongTopic> assignedTopics;
+
+    public InlongTopicManager(ClientContext context, QueryConsumeConfig 
queryConsumeConfig) {
+        super(context, queryConsumeConfig);
+        executor.scheduleWithFixedDelay(this::updateMetaData, 0L,
+                context.getConfig().getUpdateMetaDataIntervalSec(), 
TimeUnit.SECONDS);
+        pool = new ForkJoinPool(context.getConfig().getThreadPoolSize());
+    }
+
+    @Override
+    public boolean clean() {
+        String sortTaskId = context.getConfig().getSortTaskId();
+        try {
+            LOGGER.info("start to clean topic manager, sortTaskId={}", 
sortTaskId);
+            stopAssign = true;
+            closeAllFetchers();
+            closeAllPulsarClients();
+            closeAllTubeFactories();
+            LOGGER.info("success to clean topic manager, sortTaskId={}", 
sortTaskId);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("failed to clean topic manager, sortTaskId={}", 
sortTaskId, e);
+        } finally {
+            fetchers.clear();
+            pulsarClients.clear();
+            tubeFactories.clear();
+            stopAssign = false;
+        }
+        return false;
+    }
+
+    @Override
+    public TopicFetcher removeTopic(String topicKey) {
+        LOGGER.info("start to close fetcher key={} ", topicKey);
+        TopicFetcher topicFetcher = fetchers.remove(topicKey);
+        if (topicFetcher != null) {
+            try {
+                topicFetcher.close();
+                context.addTopicOfflineCount(1);
+            } catch (Exception e) {
+                LOGGER.error("close fetcher failed, key={}", topicKey, e);
+            }
+        }
+        return topicFetcher;
+    }
+
+    private void closeAllFetchers() {
+        pool.submit(() -> fetchers.keySet()
+                .stream()
+                .parallel()
+                .forEach(this::removeTopic));
+    }
+
+    private void closeAllPulsarClients() {
+        pool.submit(() -> pulsarClients.keySet()
+                .stream()
+                .parallel()
+                .forEach(this::closePulsarClient));
+    }
+
+    private void closeAllTubeFactories() {
+        pool.submit(() -> tubeFactories.keySet()
+                .stream()
+                .parallel()
+                .forEach(this::closeTubeFactory));
+    }
+
+    private TubeConsumerCreator closeTubeFactory(String clusterId) {
+        LOGGER.info("start to close tube creator id = {}", clusterId);
+        TubeConsumerCreator creator = tubeFactories.remove(clusterId);
+        try {
+            if (creator != null) {
+                creator.getMessageSessionFactory().shutdown();
+            }
+        } catch (Exception e) {
+            LOGGER.error("close tube factory failed, client id = {}", 
clusterId);
+        }
+        return creator;
+    }
+
+    private PulsarClient closePulsarClient(String clusterId) {
+        LOGGER.info("start to close pulsar client id = {}", clusterId);
+        PulsarClient client = pulsarClients.remove(clusterId);
+        try {
+            if (client != null) {
+                client.close();
+            }
+        } catch (Exception e) {
+            LOGGER.error("close pulsar client failed, client id = {}", 
clusterId);
+        }
+        return client;
+    }
+
+    @Override
+    public TopicFetcher addTopic(InLongTopic topic) {
+        checkAndOnlineCluster(topic);
+        return onlineNewTopic(topic);
+    }
+
+    @Override
+    public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+        LOGGER.info("start to remove topicKey={}", topic.getTopicKey());
+        TopicFetcher result = fetchers.remove(topic.getTopicKey());
+        if (result != null && closeFetcher) {
+            result.close();
+        }
+        return result;
+    }
+
+    @Override
+    public TopicFetcher getFetcher(String fetchKey) {
+        return fetchers.get(fetchKey);
+    }
+
+    @Override
+    public Collection<TopicFetcher> getAllFetchers() {
+        return new ArrayList<>(fetchers.values());
+    }
+
+    @Override
+    public Set<String> getManagedInLongTopics() {
+        return new HashSet<>(fetchers.keySet());
+    }
+
+    @Override
+    public void offlineAllTopicsAndPartitions() {
+        stopAssign = true;
+        closeAllFetchers();
+    }
+
+    @Override
+    public void close() {
+        if (!executor.isShutdown()) {
+            executor.shutdown();
+        }
+        clean();
+    }
+
+    @Override
+    public void restartAssigned() {
+        stopAssign = false;
+    }
+
+    @Override
+    public void stopAssigned() {
+        stopAssign = true;
+    }
+
+    private void updateMetaData() {
+        LOGGER.debug("InLongTopicManager doWork");
+        if (stopAssign) {
+            LOGGER.warn("assign is stopped");
+            return;
+        }
+        // get sortTask conf from manager
+        if (queryConsumeConfig != null) {
+            long start = System.currentTimeMillis();
+            context.addRequestManager();
+            ConsumeConfig consumeConfig = queryConsumeConfig
+                    
.queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
+            if (consumeConfig != null) {
+                this.assignedTopics = new 
HashSet<>(context.getConfig().getConsumerSubset(consumeConfig.getTopics()));
+                handleUpdatedConsumeConfig();
+            } else {
+                LOGGER.warn("subscribedInfo is null");
+                context.addRequestManagerFail(System.currentTimeMillis() - 
start);
+            }
+        } else {
+            LOGGER.error("subscribedMetaDataInfo is null");
+        }
+    }
+
+    private void handleUpdatedConsumeConfig() {
+        LOGGER.info("start to handle updated consume config");
+        if (CollectionUtils.isEmpty(assignedTopics)) {
+            LOGGER.warn("assignedTopics is null or empty, do nothing");
+            return;
+        }
+        this.onlinePulsarClients();
+        this.onlineTubeFactories();
+        this.offlineRemovedTopics();
+        this.onlineNewTopics();
+        this.updateCurrentTopics();
+        this.offlinePulsarClients();
+        this.offlineTubeFactories();
+        LOGGER.info("end to handle updated consume config");
+    }
+
+    private void offlineTubeFactories() {
+        List<CacheZoneCluster> assignedTubeClusters = 
this.getCacheZoneClusters(InlongTopicTypeEnum.TUBE);
+        Set<String> intersection = assignedTubeClusters.stream()
+                .map(CacheZoneCluster::getClusterId)
+                .filter(tubeFactories::containsKey)
+                .collect(Collectors.toSet());
+        pool.submit(() -> {
+            Set<String> currentCluster = new HashSet<>(tubeFactories.keySet());
+            currentCluster.stream().parallel()
+                    .filter(id -> !intersection.contains(id))
+                    .forEach(this::offlineTubeFactory);
+        });
+    }
+
+    private void offlineTubeFactory(String clientId) {
+        TubeConsumerCreator client = tubeFactories.remove(clientId);
+        if (client != null) {
+            LOGGER.info("start to close tube clientId={}", clientId);
+            try {
+                client.getMessageSessionFactory().shutdown();
+                LOGGER.info("success to close tube clientId={}", clientId);
+            } catch (Exception e) {
+                LOGGER.warn("failed to close tube clientId={}", clientId);
+            }
+        } else {
+            LOGGER.warn("when close tube client, find no client id={}", 
clientId);
+        }
+    }
+
+    private void offlinePulsarClients() {
+        List<CacheZoneCluster> assignedPulsarClusters = 
this.getCacheZoneClusters(InlongTopicTypeEnum.PULSAR);
+        Set<String> intersection = assignedPulsarClusters.stream()
+                .map(CacheZoneCluster::getClusterId)
+                .filter(pulsarClients::containsKey)
+                .collect(Collectors.toSet());
+        pool.submit(() -> {
+            Set<String> currentCluster = new HashSet<>(pulsarClients.keySet());
+            currentCluster.stream().parallel()
+                    .filter(id -> !intersection.contains(id))
+                    .forEach(this::offlinePulsarClient);
+        });
+    }
+
+    private void offlinePulsarClient(String clientId) {
+        PulsarClient client = pulsarClients.remove(clientId);
+        if (client != null) {
+            LOGGER.info("start to close pulsar clientId={}", clientId);
+            try {
+                client.close();
+                LOGGER.info("success to close pulsar clientId={}", clientId);
+            } catch (Exception e) {
+                LOGGER.warn("failed to close pulsar clientId={}", clientId);
+            }
+        } else {
+            LOGGER.warn("when close pulsar client, find no client id={}", 
clientId);
+        }
+    }
+
+    private void onlineTubeFactories() {
+        List<CacheZoneCluster> assignedTubeClusters = 
this.getCacheZoneClusters(InlongTopicTypeEnum.TUBE);
+        List<CacheZoneCluster> newClusters = assignedTubeClusters.stream()
+                .filter(cluster -> 
!tubeFactories.containsKey(cluster.getClusterId()))
+                .collect(Collectors.toList());
+        pool.submit(() -> 
newClusters.stream().parallel().forEach(this::createTubeConsumerCreator));
+    }
+
+    private void createTubeConsumerCreator(CacheZoneCluster cluster) {
+        LOGGER.info("start to init tube creator for cluster={}", cluster);
+        if (cluster.getBootstraps() != null) {
+            try {
+                TubeClientConfig tubeConfig = new 
TubeClientConfig(cluster.getBootstraps());
+                MessageSessionFactory messageSessionFactory = new 
TubeSingleSessionFactory(tubeConfig);
+                TubeConsumerCreator tubeConsumerCreator = new 
TubeConsumerCreator(messageSessionFactory,
+                        tubeConfig);
+                TubeConsumerCreator oldCreator = 
tubeFactories.putIfAbsent(cluster.getClusterId(), tubeConsumerCreator);
+                if (oldCreator != null) {
+                    LOGGER.warn("close new tube creator for cluster={}", 
cluster);
+                    tubeConsumerCreator.getMessageSessionFactory().shutdown();
+                }
+            } catch (Exception e) {
+                LOGGER.error("create tube creator error for cluster={}", 
cluster, e);
+                return;
+            }
+            LOGGER.info("success to init tube creatorfor cluster={}", cluster);
+        } else {
+            LOGGER.error("bootstrap is null for cluster={}", cluster);
+        }
+    }
+
+    private void onlinePulsarClients() {
+        List<CacheZoneCluster> assignedPulsarClusters = 
this.getCacheZoneClusters(InlongTopicTypeEnum.PULSAR);
+        List<CacheZoneCluster> newClusters = assignedPulsarClusters.stream()
+                .filter(cluster -> 
!pulsarClients.containsKey(cluster.getClusterId()))
+                .collect(Collectors.toList());
+        pool.submit(() -> 
newClusters.stream().parallel().forEach(this::createPulsarClient));
+    }
+
+    private void createPulsarClient(CacheZoneCluster cluster) {
+        LOGGER.info("start to init pulsar client for cluster={}", cluster);
+        if (cluster.getBootstraps() != null) {
+            try {
+                PulsarClient pulsarClient = PulsarClient.builder()
+                        .serviceUrl(cluster.getBootstraps())
+                        
.authentication(AuthenticationFactory.token(cluster.getToken()))
+                        .build();
+                PulsarClient oldClient = 
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
+                if (oldClient != null && !oldClient.isClosed()) {
+                    LOGGER.warn("close new pulsar client for cluster={}", 
cluster);
+                    pulsarClient.close();
+                }
+            } catch (Exception e) {
+                LOGGER.error("create pulsar client error for cluster={}", 
cluster, e);
+                return;
+            }
+            LOGGER.info("success to init pulsar client for cluster={}", 
cluster);
+        } else {
+            LOGGER.error("bootstrap is null for cluster={}", cluster);
+        }
+    }
+
+    private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum 
type) {
+        return assignedTopics.stream()
+                .filter(topic -> 
type.getName().equalsIgnoreCase(topic.getTopicType()))
+                .map(InLongTopic::getInLongCluster)
+                .distinct()
+                .collect(Collectors.toList());
+    }
+
+    private void checkAndOnlineCluster(InLongTopic topic) {
+        switch (topic.getTopicType().toLowerCase()) {
+            case "pulsar":
+                if 
(!pulsarClients.containsKey(topic.getInLongCluster().getClusterId())) {
+                    createPulsarClient(topic.getInLongCluster());
+                }
+                return;
+            case "tube":
+                if 
(!tubeFactories.containsKey(topic.getInLongCluster().getClusterId())) {
+                    createTubeConsumerCreator(topic.getInLongCluster());
+                }
+                return;
+            default:
+                LOGGER.error("do not support type={}", topic.getTopicType());
+        }
+    }
+
+    private TopicFetcher onlineNewTopic(InLongTopic topic) {
+        try {
+            if 
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType())) {
+                LOGGER.info("the topic is pulsar {}", topic);
+                return TopicFetcherBuilder.newPulsarBuilder()
+                        
.pulsarClient(pulsarClients.get(topic.getInLongCluster().getClusterId()))
+                        .topic(topic)
+                        .context(context)
+                        .subscribe();
+            } else if 
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType())) {
+                LOGGER.info("the topic is kafka {}", topic);
+                return TopicFetcherBuilder.newKafkaBuilder()
+                        
.bootstrapServers(topic.getInLongCluster().getBootstraps())
+                        .topic(topic)
+                        .context(context)
+                        .subscribe();
+            } else if 
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType())) {
+                LOGGER.info("the topic is tube {}", topic);
+                return TopicFetcherBuilder.newTubeBuilder()
+                        
.tubeConsumerCreater(tubeFactories.get(topic.getInLongCluster().getClusterId()))
+                        .topic(topic)
+                        .context(context)
+                        .subscribe();
+            } else {
+                LOGGER.error("topic type not support " + topic.getTopicType());
+                return null;
+            }
+        } catch (Exception e) {
+            LOGGER.error("failed to subscribe new topic={}", topic, e);
+            return null;
+        }
+
+    }
+
+    private void onlineNewTopics() {
+        pool.submit(() -> 
getOnlineTopics().stream().parallel().forEach(this::addTopic));
+    }
+
+    private void updateCurrentTopics() {
+        pool.submit(() -> 
getUpdateTopics().stream().parallel().forEach(this::updateTopic));
+    }
+
+    private void updateTopic(InLongTopic topic) {
+        TopicFetcher fetcher = fetchers.get(topic.getTopicKey());
+        if (fetcher == null) {
+            LOGGER.warn("when update topic, find no topic={}", topic);
+            return;
+        }
+        fetcher.updateTopics(Collections.singletonList(topic));
+    }
+
+    private List<InLongTopic> getOnlineTopics() {
+        return assignedTopics.stream()
+                .filter(topic -> !fetchers.containsKey(topic.getTopicKey()))
+                .distinct()
+                .collect(Collectors.toList());
+    }
+
+    private void offlineRemovedTopics() {
+        pool.submit(() -> getOfflineTopics().stream().parallel()
+                .map(InLongTopic::getTopicKey)
+                .forEach(this::removeTopic));
+    }
+
+    private List<InLongTopic> getOfflineTopics() {
+        Set<String> intersection = assignedTopics.stream()
+                .map(InLongTopic::getTopicKey)
+                .filter(fetchers::containsKey)
+                .collect(Collectors.toSet());
+
+        return assignedTopics.stream()
+                .filter(topic -> !intersection.contains(topic.getTopicKey()))
+                .distinct()
+                .collect(Collectors.toList());
+
+    }
+
+    private List<InLongTopic> getUpdateTopics() {
+        return assignedTopics.stream()
+                .filter(topic -> fetchers.containsKey(topic.getTopicKey()))
+                .distinct()
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
index 1282fea5c7..d85db642d8 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkMetricItem.java
@@ -58,9 +58,12 @@ public class SortSdkMetricItem extends MetricItem {
     public static final String M_REQUEST_MANAGER_COUNT = "requestManagerCount";
     public static final String M_REQUEST_MANAGER_TIME_COST = 
"requestManagerTimeCost";
     public static final String M_REQUEST_MANAGER_FAIL_COUNT = 
"requestManagerFailCount";
-    public static final String M_REQUEST_MANAGER_CONF_CHANAGED_COUNT = 
"requestManagerConfChanagedCount";
-    public static final String M_RQUEST_MANAGER_COMMON_ERROR_COUNT = 
"requestManagerCommonErrorCount";
-    public static final String M_RQUEST_MANAGER_PARAM_ERROR_COUNT = 
"requestManagerParamErrorCount";
+    public static final String M_REQEUST_MANAGER_EMPTY_COUNT = 
"requestManagerEmptyCount";
+    public static final String 
M_REQUEST_MANAGER_TOPICS_CHANGE_OUT_OF_THRESHOLD =
+            "requestManagerTopicsChangeOutOfThreshold";
+    public static final String M_REQUEST_MANAGER_CONF_CHANGED_COUNT = 
"requestManagerConfChangedCount";
+    public static final String M_REQUEST_MANAGER_COMMON_ERROR_COUNT = 
"requestManagerCommonErrorCount";
+    public static final String M_REQUEST_MANAGER_PARAM_ERROR_COUNT = 
"requestManagerParamErrorCount";
 
     @Dimension
     public String sortTaskId;
@@ -113,6 +116,10 @@ public class SortSdkMetricItem extends MetricItem {
     public AtomicLong requestManagerCommonErrorCount = new AtomicLong(0);
     @CountMetric
     public AtomicLong requestManagerParamErrorCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong reqeustManagerEmptyCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong requestManagerTopicsChangeOutOfThreshold = new 
AtomicLong(0);
 
     public SortSdkMetricItem() {
 
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
index 95898de333..9ec89ea066 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/metrics/SortSdkPrometheusMetricListener.java
@@ -87,11 +87,12 @@ public class SortSdkPrometheusMetricListener extends 
Collector implements Metric
         metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_COUNT, 
metricItem.requestManagerCount);
         metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_TIME_COST, 
metricItem.requestManagerTimeCost);
         metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_FAIL_COUNT, 
metricItem.requestManagerFailCount);
-        
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_CONF_CHANAGED_COUNT,
+        metricValueMap.put(SortSdkMetricItem.M_REQEUST_MANAGER_EMPTY_COUNT, 
metricItem.reqeustManagerEmptyCount);
+        
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_CONF_CHANGED_COUNT,
                 metricItem.requestManagerConfChangedCount);
-        
metricValueMap.put(SortSdkMetricItem.M_RQUEST_MANAGER_COMMON_ERROR_COUNT,
+        
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_COMMON_ERROR_COUNT,
                 metricItem.requestManagerCommonErrorCount);
-        
metricValueMap.put(SortSdkMetricItem.M_RQUEST_MANAGER_PARAM_ERROR_COUNT,
+        
metricValueMap.put(SortSdkMetricItem.M_REQUEST_MANAGER_PARAM_ERROR_COUNT,
                 metricItem.requestManagerParamErrorCount);
 
         this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
@@ -123,12 +124,14 @@ public class SortSdkPrometheusMetricListener extends 
Collector implements Metric
                 metricItem.requestManagerTimeCost.get());
         
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_FAIL_COUNT),
                 metricItem.requestManagerFailCount.get());
-        
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_CONF_CHANAGED_COUNT),
+        
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_CONF_CHANGED_COUNT),
                 metricItem.requestManagerConfChangedCount.get());
-        
totalCounter.addMetric(Collections.singletonList(M_RQUEST_MANAGER_PARAM_ERROR_COUNT),
+        
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_PARAM_ERROR_COUNT),
                 metricItem.requestManagerParamErrorCount.get());
-        
totalCounter.addMetric(Collections.singletonList(M_RQUEST_MANAGER_COMMON_ERROR_COUNT),
+        
totalCounter.addMetric(Collections.singletonList(M_REQUEST_MANAGER_COMMON_ERROR_COUNT),
                 metricItem.requestManagerCommonErrorCount.get());
+        
totalCounter.addMetric(Collections.singletonList(M_REQEUST_MANAGER_EMPTY_COUNT),
+                metricItem.reqeustManagerEmptyCount.get());
         List<MetricFamilySamples> mfs = new ArrayList<>();
         mfs.add(totalCounter);
         return mfs;

Reply via email to