This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e8c55fa1c [INLONG-8758][DataProxy] Metadata synchronization 
management optimization (#8759)
7e8c55fa1c is described below

commit 7e8c55fa1c89c3878029296782afccd3366c2bf6
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Aug 18 09:24:40 2023 +0800

    [INLONG-8758][DataProxy] Metadata synchronization management optimization 
(#8759)
---
 .../dataproxy/config/CommonConfigHolder.java       |  13 +++
 .../inlong/dataproxy/config/ConfigManager.java     |  35 +++++--
 .../dataproxy/config/holder/MetaConfigHolder.java  | 116 ++++++++++++++-------
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      |   2 +-
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    |   4 +-
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java |   2 +-
 .../config/holder/TestMetaConfigHolder.java        |   4 +-
 7 files changed, 123 insertions(+), 53 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d17bb70a08..b69b8d98c8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -76,6 +76,9 @@ public class CommonConfigHolder {
     private static final String KEY_CONFIG_CHECK_INTERVAL_MS = 
"configCheckInterval";
     public static final long VAL_DEF_CONFIG_SYNC_INTERVAL_MS = 60000L;
     public static final long VAL_MIN_CONFIG_SYNC_INTERVAL_MS = 10000L;
+    // max allowed wait duration
+    private static final String KEY_META_CONFIG_SYNC_WAST_ALARM_MS = 
"meta.config.sync.wast.alarm.ms";
+    public static final long VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS = 30000L;
     // whether to startup using the local metadata.json file without 
connecting to the Manager
     private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE =
             "startup.using.local.meta.file.enable";
@@ -165,6 +168,7 @@ public class CommonConfigHolder {
     private String managerAuthSecretKey = "";
     private boolean enableStartupUsingLocalMetaFile = 
VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE;
     private long metaConfigSyncInvlMs = VAL_DEF_CONFIG_SYNC_INTERVAL_MS;
+    private long metaConfigWastAlarmMs = 
VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS;
     private boolean enableAudit = VAL_DEF_ENABLE_AUDIT;
     private final HashSet<String> auditProxys = new HashSet<>();
     private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
@@ -253,6 +257,10 @@ public class CommonConfigHolder {
         return metaConfigSyncInvlMs;
     }
 
+    public long getMetaConfigWastAlarmMs() {
+        return metaConfigWastAlarmMs;
+    }
+
     public boolean isEnableUnConfigTopicAccept() {
         return enableUnConfigTopicAccept;
     }
@@ -409,6 +417,11 @@ public class CommonConfigHolder {
                 this.metaConfigSyncInvlMs = tmpSyncInvMs;
             }
         }
+        // read configure sync wast alarm ms
+        tmpValue = this.props.get(KEY_META_CONFIG_SYNC_WAST_ALARM_MS);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.metaConfigWastAlarmMs = NumberUtils.toLong(tmpValue.trim(), 
VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS);
+        }
         // read enable startup using local meta file
         tmpValue = this.props.get(KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE);
         if (StringUtils.isNotEmpty(tmpValue)) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index e37a1e8925..c72af67ae4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -124,14 +124,17 @@ public class ConfigManager {
     }
 
     /**
-     * get topic by groupId and streamId
+     * get source topic by groupId and streamId
      */
     public String getTopicName(String groupId, String streamId) {
-        return metaConfigHolder.getBaseTopicName(groupId, streamId);
+        return metaConfigHolder.getSrcBaseTopicName(groupId, streamId);
     }
 
-    public IdTopicConfig getIdTopicConfig(String groupId, String streamId) {
-        return metaConfigHolder.getIdTopicConfig(groupId, streamId);
+    /**
+     * get sink topic configure by groupId and streamId
+     */
+    public IdTopicConfig getSinkIdTopicConfig(String groupId, String streamId) 
{
+        return metaConfigHolder.getSinkIdTopicConfig(groupId, streamId);
     }
 
     public String getMetaConfigMD5() {
@@ -346,17 +349,29 @@ public class ConfigManager {
                     request.setMd5(configManager.getMetaConfigMD5());
                 }
                 httpPost.setEntity(HttpUtils.getEntity(request));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Start to request {} to get config info, with 
params: {}, headers: {}",
+                            url, request, httpPost.getAllHeaders());
+                }
                 // request with post
-                LOG.info("Start to request {} to get config info, with params: 
{}, headers: {}",
-                        url, request, httpPost.getAllHeaders());
+                long startTime = System.currentTimeMillis();
                 CloseableHttpResponse response = httpClient.execute(httpPost);
                 String returnStr = EntityUtils.toString(response.getEntity());
+                long dltTime = System.currentTimeMillis() - startTime;
+                if (dltTime >= 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
+                    LOG.warn("End to request {} to get config info:{}, WAIST 
{} ms",
+                            url, returnStr, dltTime);
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("End to request {} to get config info:{}, 
WAIST {} ms",
+                                url, returnStr, dltTime);
+                    }
+                }
                 if (response.getStatusLine().getStatusCode() != 200) {
                     LOG.warn("Failed to request {}, with params: {}, headers: 
{}, the response is {}",
                             url, request, httpPost.getAllHeaders(), returnStr);
                     return false;
                 }
-                LOG.info("End to request {} to get config info:{}", url, 
returnStr);
                 // get groupId <-> topic and m value.
                 DataProxyConfigResponse proxyResponse =
                         gson.fromJson(returnStr, 
DataProxyConfigResponse.class);
@@ -382,8 +397,10 @@ public class ConfigManager {
                 }
                 // update meta configure
                 if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(), 
returnStr)) {
-                    ConfigManager.handshakeManagerOk.set(true);
-                    LOG.info("Get config success from manager and updated, set 
handshake status is ok!");
+                    if (!ConfigManager.handshakeManagerOk.get()) {
+                        ConfigManager.handshakeManagerOk.set(true);
+                        LOG.info("Get config success from manager and updated, 
set handshake status is ok!");
+                    }
                 }
                 return true;
             } catch (Throwable ex) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 6d0da8d5ea..5cc4d652b3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -63,8 +63,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class MetaConfigHolder extends ConfigHolder {
 
     private static final String metaConfigFileName = "metadata.json";
-    private static final long MAX_SYNC_WAIT_TIME_MS =
-            CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs() * 2 + 
5000L;
     private static final int MAX_ALLOWED_JSON_FILE_SIZE = 300 * 1024 * 1024;
     private static final Logger LOG = 
LoggerFactory.getLogger(MetaConfigHolder.class);
     private static final Gson GSON = new Gson();
@@ -72,13 +70,15 @@ public class MetaConfigHolder extends ConfigHolder {
     // meta data
     private String dataMd5 = "";
     private String dataStr = "";
+    private final AtomicLong lastUpdVersion = new AtomicLong(0);
     private String tmpDataMd5 = "";
-    private final AtomicLong lastSyncTime = new AtomicLong(0);
+    private final AtomicLong lastSyncVersion = new AtomicLong(0);
     // cached data
     private final List<String> defTopics = new ArrayList<>();
     private final AtomicInteger clusterType = new 
AtomicInteger(CacheType.N.getId());
     private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap = 
new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, IdTopicConfig> id2TopicMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap = 
new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSinkMap = 
new ConcurrentHashMap<>();
 
     public MetaConfigHolder() {
         super(metaConfigFileName);
@@ -92,22 +92,22 @@ public class MetaConfigHolder extends ConfigHolder {
     }
 
     /**
-     * get topic by groupId and streamId
+     * get source topic by groupId and streamId
      */
-    public String getBaseTopicName(String groupId, String streamId) {
-        IdTopicConfig idTopicConfig = getIdTopicConfig(groupId, streamId);
+    public String getSrcBaseTopicName(String groupId, String streamId) {
+        IdTopicConfig idTopicConfig = getSrcIdTopicConfig(groupId, streamId);
         if (idTopicConfig == null) {
             return null;
         }
         return idTopicConfig.getTopicName();
     }
 
-    public IdTopicConfig getIdTopicConfig(String groupId, String streamId) {
+    public IdTopicConfig getSrcIdTopicConfig(String groupId, String streamId) {
         IdTopicConfig idTopicConfig = null;
-        if (StringUtils.isNotEmpty(groupId) && !id2TopicMap.isEmpty()) {
-            idTopicConfig = id2TopicMap.get(InlongId.generateUid(groupId, 
streamId));
+        if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
+            idTopicConfig = id2TopicSrcMap.get(InlongId.generateUid(groupId, 
streamId));
             if (idTopicConfig == null) {
-                idTopicConfig = id2TopicMap.get(groupId);
+                idTopicConfig = id2TopicSrcMap.get(groupId);
             }
         }
         if (LOG.isDebugEnabled()) {
@@ -120,12 +120,12 @@ public class MetaConfigHolder extends ConfigHolder {
     /**
      * get topic by groupId and streamId
      */
-    public String getTopicName(String groupId, String streamId) {
+    public String getSourceTopicName(String groupId, String streamId) {
         String topic = null;
-        if (StringUtils.isNotEmpty(groupId) && !id2TopicMap.isEmpty()) {
-            IdTopicConfig idTopicConfig = 
id2TopicMap.get(InlongId.generateUid(groupId, streamId));
+        if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
+            IdTopicConfig idTopicConfig = 
id2TopicSrcMap.get(InlongId.generateUid(groupId, streamId));
             if (idTopicConfig == null) {
-                idTopicConfig = id2TopicMap.get(groupId);
+                idTopicConfig = id2TopicSrcMap.get(groupId);
             }
             if (idTopicConfig != null) {
                 topic = idTopicConfig.getTopicName();
@@ -138,24 +138,53 @@ public class MetaConfigHolder extends ConfigHolder {
         return topic;
     }
 
+    public IdTopicConfig getSinkIdTopicConfig(String groupId, String streamId) 
{
+        IdTopicConfig idTopicConfig = null;
+        if (StringUtils.isNotEmpty(groupId) && !id2TopicSinkMap.isEmpty()) {
+            idTopicConfig = id2TopicSinkMap.get(InlongId.generateUid(groupId, 
streamId));
+            if (idTopicConfig == null) {
+                idTopicConfig = id2TopicSinkMap.get(groupId);
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Get Sink Topic Config by groupId = {}, streamId = {}, 
IdTopicConfig = {}",
+                    groupId, streamId, idTopicConfig);
+        }
+        return idTopicConfig;
+    }
+
     public String getConfigMd5() {
-        return (System.currentTimeMillis() - lastSyncTime.get() >= 
MAX_SYNC_WAIT_TIME_MS)
-                ? dataMd5
-                : tmpDataMd5;
+        synchronized (this.lastUpdVersion) {
+            if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
+                return tmpDataMd5;
+            } else {
+                return dataMd5;
+            }
+        }
     }
 
     public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
         if (StringUtils.isBlank(inDataMd5)
-                || StringUtils.isBlank(inDataJsonStr)
-                || inDataMd5.equalsIgnoreCase(dataMd5)) {
+                || StringUtils.isBlank(inDataJsonStr)) {
             return false;
         }
-        if (storeConfigToFile(inDataJsonStr)) {
-            tmpDataMd5 = inDataMd5;
-            lastSyncTime.set(System.currentTimeMillis());
-            return true;
+        synchronized (this.lastSyncVersion) {
+            synchronized (this.lastUpdVersion) {
+                if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
+                    if (inDataJsonStr.equals(tmpDataMd5)) {
+                        return false;
+                    }
+                    LOG.info("Load changed metadata {} , but reloading 
content, over {} ms",
+                            getFileName(), System.currentTimeMillis() - 
this.lastSyncVersion.get());
+                    return false;
+                } else {
+                    if (inDataMd5.equalsIgnoreCase(dataMd5)) {
+                        return false;
+                    }
+                }
+            }
+            return storeConfigToFile(inDataMd5, inDataJsonStr);
         }
-        return false;
     }
 
     public List<CacheClusterConfig> forkCachedCLusterConfig() {
@@ -183,7 +212,7 @@ public class MetaConfigHolder extends ConfigHolder {
             result.addAll(CommonConfigHolder.getInstance().getDefTopics());
         }
         // add configured topics
-        for (IdTopicConfig topicConfig : id2TopicMap.values()) {
+        for (IdTopicConfig topicConfig : id2TopicSrcMap.values()) {
             if (topicConfig == null) {
                 continue;
             }
@@ -223,18 +252,25 @@ public class MetaConfigHolder extends ConfigHolder {
             // update cache data
             if (updateCacheData(clusterObj)) {
                 // update cache string
-                this.dataMd5 = metaConfig.getMd5();
-                this.tmpDataMd5 = metaConfig.getMd5();
-                this.lastSyncTime.set(System.currentTimeMillis());
-                this.dataStr = jsonString;
-                LOG.info("Load changed json {}, loaded dataMd5 {}, loaded data 
{}, updated cache ({}, {})",
-                        getFileName(), dataMd5, dataStr, id2TopicMap, 
mqClusterMap);
+                synchronized (this.lastUpdVersion) {
+                    if (this.lastSyncVersion.get() == 0) {
+                        this.lastUpdVersion.set(System.currentTimeMillis());
+                        this.lastSyncVersion.compareAndSet(0, 
this.lastUpdVersion.get());
+                    } else {
+                        this.lastUpdVersion.set(this.lastSyncVersion.get());
+                    }
+                    this.dataMd5 = metaConfig.getMd5();
+                    this.dataStr = jsonString;
+                }
+                LOG.info(
+                        "Load changed {}, loaded dataMd5={}, data={}, 
id2TopicSrcMap={}, mqClusterMap={}, id2TopicSinkMap={}",
+                        getFileName(), dataMd5, dataStr, id2TopicSrcMap, 
mqClusterMap, id2TopicSinkMap);
                 return true;
             }
             return false;
         } catch (Throwable e) {
             //
-            LOG.info("Process json {} changed data {} failure", getFileName(), 
jsonString, e);
+            LOG.warn("Process json {} changed data {} failure", getFileName(), 
jsonString, e);
             return false;
         } finally {
             readWriteLock.readLock().unlock();
@@ -300,7 +336,7 @@ public class MetaConfigHolder extends ConfigHolder {
         this.clusterType.getAndSet(cacheType.getId());
         // remove deleted id2topic config
         Set<String> tmpKeys = new HashSet<>();
-        for (Map.Entry<String, IdTopicConfig> entry : id2TopicMap.entrySet()) {
+        for (Map.Entry<String, IdTopicConfig> entry : 
id2TopicSrcMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
@@ -309,10 +345,12 @@ public class MetaConfigHolder extends ConfigHolder {
             }
         }
         for (String key : tmpKeys) {
-            id2TopicMap.remove(key);
+            id2TopicSrcMap.remove(key);
         }
-        // add new id2topic config
-        id2TopicMap.putAll(topicConfigMap);
+        // add new id2topic source config
+        id2TopicSrcMap.putAll(topicConfigMap);
+        // add new id2topic sink config
+        id2TopicSinkMap.putAll(topicConfigMap);
         // remove deleted cluster config
         tmpKeys.clear();
         for (Map.Entry<String, CacheClusterConfig> entry : 
mqClusterMap.entrySet()) {
@@ -412,7 +450,7 @@ public class MetaConfigHolder extends ConfigHolder {
     /**
      * store meta config to file
      */
-    private boolean storeConfigToFile(String metaJsonStr) {
+    private boolean storeConfigToFile(String inDataMd5, String metaJsonStr) {
         boolean isSuccess = false;
         String filePath = getFilePath();
         if (StringUtils.isBlank(filePath)) {
@@ -431,6 +469,8 @@ public class MetaConfigHolder extends ConfigHolder {
             FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, 
StandardCharsets.UTF_8);
             FileUtils.copyFile(tmpNewFile, sourceFile);
             tmpNewFile.delete();
+            tmpDataMd5 = inDataMd5;
+            lastSyncVersion.set(System.currentTimeMillis());
             isSuccess = true;
             setFileChanged();
         } catch (Throwable ex) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index e10ff68dd9..b282cc31ca 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -121,7 +121,7 @@ public class KafkaHandler implements MessageQueueHandler {
         String topic = null;
         try {
             // get idConfig
-            IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
+            IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
                 if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 1b76eaf72a..650aafb637 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -206,7 +206,7 @@ public class PulsarHandler implements MessageQueueHandler {
         String producerTopic = null;
         try {
             // get idConfig
-            IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
+            IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
                 if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
@@ -354,7 +354,7 @@ public class PulsarHandler implements MessageQueueHandler {
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                sinkContext.fileMetricAddFailStats(simpleProfile, 
producerTopic, "", producerTopic);
+                sinkContext.fileMetricAddFailStats(simpleProfile, 
producerTopic, msgId.toString(), producerTopic);
                 sinkContext.processSendFail(simpleProfile, clusterName, 
producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index b363f79804..60b3910dc0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -186,7 +186,7 @@ public class TubeHandler implements MessageQueueHandler {
         String topic = null;
         try {
             // idConfig
-            IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
+            IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
                 if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
index 5faf4499c7..b12e952dac 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
@@ -38,8 +38,8 @@ public class TestMetaConfigHolder {
         Assert.assertTrue(result);
         Assert.assertEquals(metaConfigHolder.getConfigMd5(), 
"5a3f5939bb7368f493bf41c1d785b8f3");
         Assert.assertEquals("test_group",
-                metaConfigHolder.getTopicName("test_group", "stream1"));
-        Assert.assertNull(metaConfigHolder.getTopicName("aaa", "stream1"));
+                metaConfigHolder.getSourceTopicName("test_group", "stream1"));
+        Assert.assertNull(metaConfigHolder.getSourceTopicName("aaa", 
"stream1"));
         List<CacheClusterConfig> clusterConfigs = 
metaConfigHolder.forkCachedCLusterConfig();
         Assert.assertEquals(1, clusterConfigs.size());
         Assert.assertEquals("test_tubemq", 
clusterConfigs.get(0).getClusterName());

Reply via email to