This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7e8c55fa1c [INLONG-8758][DataProxy] Metadata synchronization
management optimization (#8759)
7e8c55fa1c is described below
commit 7e8c55fa1c89c3878029296782afccd3366c2bf6
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Aug 18 09:24:40 2023 +0800
[INLONG-8758][DataProxy] Metadata synchronization management optimization
(#8759)
---
.../dataproxy/config/CommonConfigHolder.java | 13 +++
.../inlong/dataproxy/config/ConfigManager.java | 35 +++++--
.../dataproxy/config/holder/MetaConfigHolder.java | 116 ++++++++++++++-------
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 2 +-
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 4 +-
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 2 +-
.../config/holder/TestMetaConfigHolder.java | 4 +-
7 files changed, 123 insertions(+), 53 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d17bb70a08..b69b8d98c8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -76,6 +76,9 @@ public class CommonConfigHolder {
private static final String KEY_CONFIG_CHECK_INTERVAL_MS =
"configCheckInterval";
public static final long VAL_DEF_CONFIG_SYNC_INTERVAL_MS = 60000L;
public static final long VAL_MIN_CONFIG_SYNC_INTERVAL_MS = 10000L;
+ // max allowed wait duration
+ private static final String KEY_META_CONFIG_SYNC_WAST_ALARM_MS =
"meta.config.sync.wast.alarm.ms";
+ public static final long VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS = 30000L;
// whether to startup using the local metadata.json file without
connecting to the Manager
private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE =
"startup.using.local.meta.file.enable";
@@ -165,6 +168,7 @@ public class CommonConfigHolder {
private String managerAuthSecretKey = "";
private boolean enableStartupUsingLocalMetaFile =
VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE;
private long metaConfigSyncInvlMs = VAL_DEF_CONFIG_SYNC_INTERVAL_MS;
+ private long metaConfigWastAlarmMs =
VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS;
private boolean enableAudit = VAL_DEF_ENABLE_AUDIT;
private final HashSet<String> auditProxys = new HashSet<>();
private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
@@ -253,6 +257,10 @@ public class CommonConfigHolder {
return metaConfigSyncInvlMs;
}
+ public long getMetaConfigWastAlarmMs() {
+ return metaConfigWastAlarmMs;
+ }
+
public boolean isEnableUnConfigTopicAccept() {
return enableUnConfigTopicAccept;
}
@@ -409,6 +417,11 @@ public class CommonConfigHolder {
this.metaConfigSyncInvlMs = tmpSyncInvMs;
}
}
+ // read configure sync wast alarm ms
+ tmpValue = this.props.get(KEY_META_CONFIG_SYNC_WAST_ALARM_MS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.metaConfigWastAlarmMs = NumberUtils.toLong(tmpValue.trim(),
VAL_DEF_META_CONFIG_SYNC_WAST_ALARM_MS);
+ }
// read enable startup using local meta file
tmpValue = this.props.get(KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE);
if (StringUtils.isNotEmpty(tmpValue)) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index e37a1e8925..c72af67ae4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -124,14 +124,17 @@ public class ConfigManager {
}
/**
- * get topic by groupId and streamId
+ * get source topic by groupId and streamId
*/
public String getTopicName(String groupId, String streamId) {
- return metaConfigHolder.getBaseTopicName(groupId, streamId);
+ return metaConfigHolder.getSrcBaseTopicName(groupId, streamId);
}
- public IdTopicConfig getIdTopicConfig(String groupId, String streamId) {
- return metaConfigHolder.getIdTopicConfig(groupId, streamId);
+ /**
+ * get sink topic configure by groupId and streamId
+ */
+ public IdTopicConfig getSinkIdTopicConfig(String groupId, String streamId)
{
+ return metaConfigHolder.getSinkIdTopicConfig(groupId, streamId);
}
public String getMetaConfigMD5() {
@@ -346,17 +349,29 @@ public class ConfigManager {
request.setMd5(configManager.getMetaConfigMD5());
}
httpPost.setEntity(HttpUtils.getEntity(request));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start to request {} to get config info, with
params: {}, headers: {}",
+ url, request, httpPost.getAllHeaders());
+ }
// request with post
- LOG.info("Start to request {} to get config info, with params:
{}, headers: {}",
- url, request, httpPost.getAllHeaders());
+ long startTime = System.currentTimeMillis();
CloseableHttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
+ long dltTime = System.currentTimeMillis() - startTime;
+ if (dltTime >=
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
+ LOG.warn("End to request {} to get config info:{}, WAIST
{} ms",
+ url, returnStr, dltTime);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("End to request {} to get config info:{},
WAIST {} ms",
+ url, returnStr, dltTime);
+ }
+ }
if (response.getStatusLine().getStatusCode() != 200) {
LOG.warn("Failed to request {}, with params: {}, headers:
{}, the response is {}",
url, request, httpPost.getAllHeaders(), returnStr);
return false;
}
- LOG.info("End to request {} to get config info:{}", url,
returnStr);
// get groupId <-> topic and m value.
DataProxyConfigResponse proxyResponse =
gson.fromJson(returnStr,
DataProxyConfigResponse.class);
@@ -382,8 +397,10 @@ public class ConfigManager {
}
// update meta configure
if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(),
returnStr)) {
- ConfigManager.handshakeManagerOk.set(true);
- LOG.info("Get config success from manager and updated, set
handshake status is ok!");
+ if (!ConfigManager.handshakeManagerOk.get()) {
+ ConfigManager.handshakeManagerOk.set(true);
+ LOG.info("Get config success from manager and updated,
set handshake status is ok!");
+ }
}
return true;
} catch (Throwable ex) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 6d0da8d5ea..5cc4d652b3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -63,8 +63,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MetaConfigHolder extends ConfigHolder {
private static final String metaConfigFileName = "metadata.json";
- private static final long MAX_SYNC_WAIT_TIME_MS =
- CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs() * 2 +
5000L;
private static final int MAX_ALLOWED_JSON_FILE_SIZE = 300 * 1024 * 1024;
private static final Logger LOG =
LoggerFactory.getLogger(MetaConfigHolder.class);
private static final Gson GSON = new Gson();
@@ -72,13 +70,15 @@ public class MetaConfigHolder extends ConfigHolder {
// meta data
private String dataMd5 = "";
private String dataStr = "";
+ private final AtomicLong lastUpdVersion = new AtomicLong(0);
private String tmpDataMd5 = "";
- private final AtomicLong lastSyncTime = new AtomicLong(0);
+ private final AtomicLong lastSyncVersion = new AtomicLong(0);
// cached data
private final List<String> defTopics = new ArrayList<>();
private final AtomicInteger clusterType = new
AtomicInteger(CacheType.N.getId());
private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap =
new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, IdTopicConfig> id2TopicMap = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSinkMap =
new ConcurrentHashMap<>();
public MetaConfigHolder() {
super(metaConfigFileName);
@@ -92,22 +92,22 @@ public class MetaConfigHolder extends ConfigHolder {
}
/**
- * get topic by groupId and streamId
+ * get source topic by groupId and streamId
*/
- public String getBaseTopicName(String groupId, String streamId) {
- IdTopicConfig idTopicConfig = getIdTopicConfig(groupId, streamId);
+ public String getSrcBaseTopicName(String groupId, String streamId) {
+ IdTopicConfig idTopicConfig = getSrcIdTopicConfig(groupId, streamId);
if (idTopicConfig == null) {
return null;
}
return idTopicConfig.getTopicName();
}
- public IdTopicConfig getIdTopicConfig(String groupId, String streamId) {
+ public IdTopicConfig getSrcIdTopicConfig(String groupId, String streamId) {
IdTopicConfig idTopicConfig = null;
- if (StringUtils.isNotEmpty(groupId) && !id2TopicMap.isEmpty()) {
- idTopicConfig = id2TopicMap.get(InlongId.generateUid(groupId,
streamId));
+ if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
+ idTopicConfig = id2TopicSrcMap.get(InlongId.generateUid(groupId,
streamId));
if (idTopicConfig == null) {
- idTopicConfig = id2TopicMap.get(groupId);
+ idTopicConfig = id2TopicSrcMap.get(groupId);
}
}
if (LOG.isDebugEnabled()) {
@@ -120,12 +120,12 @@ public class MetaConfigHolder extends ConfigHolder {
/**
* get topic by groupId and streamId
*/
- public String getTopicName(String groupId, String streamId) {
+ public String getSourceTopicName(String groupId, String streamId) {
String topic = null;
- if (StringUtils.isNotEmpty(groupId) && !id2TopicMap.isEmpty()) {
- IdTopicConfig idTopicConfig =
id2TopicMap.get(InlongId.generateUid(groupId, streamId));
+ if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
+ IdTopicConfig idTopicConfig =
id2TopicSrcMap.get(InlongId.generateUid(groupId, streamId));
if (idTopicConfig == null) {
- idTopicConfig = id2TopicMap.get(groupId);
+ idTopicConfig = id2TopicSrcMap.get(groupId);
}
if (idTopicConfig != null) {
topic = idTopicConfig.getTopicName();
@@ -138,24 +138,53 @@ public class MetaConfigHolder extends ConfigHolder {
return topic;
}
+ public IdTopicConfig getSinkIdTopicConfig(String groupId, String streamId)
{
+ IdTopicConfig idTopicConfig = null;
+ if (StringUtils.isNotEmpty(groupId) && !id2TopicSinkMap.isEmpty()) {
+ idTopicConfig = id2TopicSinkMap.get(InlongId.generateUid(groupId,
streamId));
+ if (idTopicConfig == null) {
+ idTopicConfig = id2TopicSinkMap.get(groupId);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get Sink Topic Config by groupId = {}, streamId = {},
IdTopicConfig = {}",
+ groupId, streamId, idTopicConfig);
+ }
+ return idTopicConfig;
+ }
+
public String getConfigMd5() {
- return (System.currentTimeMillis() - lastSyncTime.get() >=
MAX_SYNC_WAIT_TIME_MS)
- ? dataMd5
- : tmpDataMd5;
+ synchronized (this.lastUpdVersion) {
+ if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
+ return tmpDataMd5;
+ } else {
+ return dataMd5;
+ }
+ }
}
public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
if (StringUtils.isBlank(inDataMd5)
- || StringUtils.isBlank(inDataJsonStr)
- || inDataMd5.equalsIgnoreCase(dataMd5)) {
+ || StringUtils.isBlank(inDataJsonStr)) {
return false;
}
- if (storeConfigToFile(inDataJsonStr)) {
- tmpDataMd5 = inDataMd5;
- lastSyncTime.set(System.currentTimeMillis());
- return true;
+ synchronized (this.lastSyncVersion) {
+ synchronized (this.lastUpdVersion) {
+ if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
+ if (inDataJsonStr.equals(tmpDataMd5)) {
+ return false;
+ }
+ LOG.info("Load changed metadata {} , but reloading
content, over {} ms",
+ getFileName(), System.currentTimeMillis() -
this.lastSyncVersion.get());
+ return false;
+ } else {
+ if (inDataMd5.equalsIgnoreCase(dataMd5)) {
+ return false;
+ }
+ }
+ }
+ return storeConfigToFile(inDataMd5, inDataJsonStr);
}
- return false;
}
public List<CacheClusterConfig> forkCachedCLusterConfig() {
@@ -183,7 +212,7 @@ public class MetaConfigHolder extends ConfigHolder {
result.addAll(CommonConfigHolder.getInstance().getDefTopics());
}
// add configured topics
- for (IdTopicConfig topicConfig : id2TopicMap.values()) {
+ for (IdTopicConfig topicConfig : id2TopicSrcMap.values()) {
if (topicConfig == null) {
continue;
}
@@ -223,18 +252,25 @@ public class MetaConfigHolder extends ConfigHolder {
// update cache data
if (updateCacheData(clusterObj)) {
// update cache string
- this.dataMd5 = metaConfig.getMd5();
- this.tmpDataMd5 = metaConfig.getMd5();
- this.lastSyncTime.set(System.currentTimeMillis());
- this.dataStr = jsonString;
- LOG.info("Load changed json {}, loaded dataMd5 {}, loaded data
{}, updated cache ({}, {})",
- getFileName(), dataMd5, dataStr, id2TopicMap,
mqClusterMap);
+ synchronized (this.lastUpdVersion) {
+ if (this.lastSyncVersion.get() == 0) {
+ this.lastUpdVersion.set(System.currentTimeMillis());
+ this.lastSyncVersion.compareAndSet(0,
this.lastUpdVersion.get());
+ } else {
+ this.lastUpdVersion.set(this.lastSyncVersion.get());
+ }
+ this.dataMd5 = metaConfig.getMd5();
+ this.dataStr = jsonString;
+ }
+ LOG.info(
+ "Load changed {}, loaded dataMd5={}, data={},
id2TopicSrcMap={}, mqClusterMap={}, id2TopicSinkMap={}",
+ getFileName(), dataMd5, dataStr, id2TopicSrcMap,
mqClusterMap, id2TopicSinkMap);
return true;
}
return false;
} catch (Throwable e) {
//
- LOG.info("Process json {} changed data {} failure", getFileName(),
jsonString, e);
+ LOG.warn("Process json {} changed data {} failure", getFileName(),
jsonString, e);
return false;
} finally {
readWriteLock.readLock().unlock();
@@ -300,7 +336,7 @@ public class MetaConfigHolder extends ConfigHolder {
this.clusterType.getAndSet(cacheType.getId());
// remove deleted id2topic config
Set<String> tmpKeys = new HashSet<>();
- for (Map.Entry<String, IdTopicConfig> entry : id2TopicMap.entrySet()) {
+ for (Map.Entry<String, IdTopicConfig> entry :
id2TopicSrcMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
@@ -309,10 +345,12 @@ public class MetaConfigHolder extends ConfigHolder {
}
}
for (String key : tmpKeys) {
- id2TopicMap.remove(key);
+ id2TopicSrcMap.remove(key);
}
- // add new id2topic config
- id2TopicMap.putAll(topicConfigMap);
+ // add new id2topic source config
+ id2TopicSrcMap.putAll(topicConfigMap);
+ // add new id2topic sink config
+ id2TopicSinkMap.putAll(topicConfigMap);
// remove deleted cluster config
tmpKeys.clear();
for (Map.Entry<String, CacheClusterConfig> entry :
mqClusterMap.entrySet()) {
@@ -412,7 +450,7 @@ public class MetaConfigHolder extends ConfigHolder {
/**
* store meta config to file
*/
- private boolean storeConfigToFile(String metaJsonStr) {
+ private boolean storeConfigToFile(String inDataMd5, String metaJsonStr) {
boolean isSuccess = false;
String filePath = getFilePath();
if (StringUtils.isBlank(filePath)) {
@@ -431,6 +469,8 @@ public class MetaConfigHolder extends ConfigHolder {
FileUtils.writeStringToFile(tmpNewFile, metaJsonStr,
StandardCharsets.UTF_8);
FileUtils.copyFile(tmpNewFile, sourceFile);
tmpNewFile.delete();
+ tmpDataMd5 = inDataMd5;
+ lastSyncVersion.set(System.currentTimeMillis());
isSuccess = true;
setFileChanged();
} catch (Throwable ex) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index e10ff68dd9..b282cc31ca 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -121,7 +121,7 @@ public class KafkaHandler implements MessageQueueHandler {
String topic = null;
try {
// get idConfig
- IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
+ IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 1b76eaf72a..650aafb637 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -206,7 +206,7 @@ public class PulsarHandler implements MessageQueueHandler {
String producerTopic = null;
try {
// get idConfig
- IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
+ IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
@@ -354,7 +354,7 @@ public class PulsarHandler implements MessageQueueHandler {
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
- sinkContext.fileMetricAddFailStats(simpleProfile,
producerTopic, "", producerTopic);
+ sinkContext.fileMetricAddFailStats(simpleProfile,
producerTopic, msgId.toString(), producerTopic);
sinkContext.processSendFail(simpleProfile, clusterName,
producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index b363f79804..60b3910dc0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -186,7 +186,7 @@ public class TubeHandler implements MessageQueueHandler {
String topic = null;
try {
// idConfig
- IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
+ IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
index 5faf4499c7..b12e952dac 100644
---
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMetaConfigHolder.java
@@ -38,8 +38,8 @@ public class TestMetaConfigHolder {
Assert.assertTrue(result);
Assert.assertEquals(metaConfigHolder.getConfigMd5(),
"5a3f5939bb7368f493bf41c1d785b8f3");
Assert.assertEquals("test_group",
- metaConfigHolder.getTopicName("test_group", "stream1"));
- Assert.assertNull(metaConfigHolder.getTopicName("aaa", "stream1"));
+ metaConfigHolder.getSourceTopicName("test_group", "stream1"));
+ Assert.assertNull(metaConfigHolder.getSourceTopicName("aaa",
"stream1"));
List<CacheClusterConfig> clusterConfigs =
metaConfigHolder.forkCachedCLusterConfig();
Assert.assertEquals(1, clusterConfigs.size());
Assert.assertEquals("test_tubemq",
clusterConfigs.get(0).getClusterName());