This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a45ecb1634 [INLONG-8819][DataProxy] Optimize ConfigHolder related
subclass loading processing (#8820)
a45ecb1634 is described below
commit a45ecb16342cbb00015e05a88bb367ec53e683dd
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Aug 30 18:45:01 2023 +0800
[INLONG-8819][DataProxy] Optimize ConfigHolder related subclass loading
processing (#8820)
---
.../inlong/dataproxy/config/ConfigHolder.java | 26 +++++++-----
.../dataproxy/config/holder/MetaConfigHolder.java | 47 +++++++++++-----------
.../dataproxy/config/holder/PropertiesHolder.java | 15 +++----
.../dataproxy/config/holder/VisitConfigHolder.java | 19 ++++++---
4 files changed, 63 insertions(+), 44 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index e07983550b..3060d9ee26 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -43,7 +43,7 @@ public abstract class ConfigHolder {
private final AtomicBoolean fileChanged = new AtomicBoolean(false);
// list of callbacks for this holder
private final List<ConfigUpdateCallback> callbackList = new ArrayList<>();
- private long lastModifyTime;
+ private long lastModifyTime = 0;
private String filePath;
private File configFile;
@@ -51,9 +51,6 @@ public abstract class ConfigHolder {
this.fileName = fileName;
setFilePath(fileName);
CONFIG_HOLDER_LIST.add(this);
- if (configFile != null) {
- this.lastModifyTime = configFile.lastModified();
- }
}
/**
@@ -77,7 +74,7 @@ public abstract class ConfigHolder {
/**
* load from file to holder
*
- * @return - true if configure updated
+ * @return - true if the configure file read, otherwise it will be false.
*/
protected abstract boolean loadFromFileToHolder();
@@ -89,11 +86,22 @@ public abstract class ConfigHolder {
public boolean checkAndUpdateHolder() {
if (fileChanged.compareAndSet(true, false)
|| (configFile != null && configFile.lastModified() !=
this.lastModifyTime)) {
- if (configFile != null) {
- this.lastModifyTime = configFile.lastModified();
+ long startTime = System.currentTimeMillis();
+ if (loadFromFileToHolder()) {
+ boolean initialized = (this.lastModifyTime != 0L);
+ if (configFile != null) {
+ this.lastModifyTime = configFile.lastModified();
+ }
+ if (initialized) {
+ LOG.info("File {} has changed, reload from local file,
wast {} ms",
+ this.fileName, (System.currentTimeMillis() -
startTime));
+ } else {
+ LOG.info("File {} has imported, reload from local file,
wast {} ms",
+ this.fileName, (System.currentTimeMillis() -
startTime));
+ }
+ return true;
}
- LOG.info("File {} has changed, reload from local file",
this.fileName);
- return loadFromFileToHolder();
+ LOG.warn("File {} has changed, but reload content failure",
this.fileName);
}
return false;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 5cc4d652b3..c28da53a3a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -178,7 +178,7 @@ public class MetaConfigHolder extends ConfigHolder {
getFileName(), System.currentTimeMillis() -
this.lastSyncVersion.get());
return false;
} else {
- if (inDataMd5.equalsIgnoreCase(dataMd5)) {
+ if (inDataMd5.equals(dataMd5)) {
return false;
}
}
@@ -223,36 +223,42 @@ public class MetaConfigHolder extends ConfigHolder {
@Override
protected boolean loadFromFileToHolder() {
+ // check meta update setting
+ if
(!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
+ && !ConfigManager.handshakeManagerOk.get()) {
+ LOG.warn("Failed to load json config from {}, don't obtain
metadata from the Manager,"
+ + " and the startup via the cache file is false",
getFileName());
+ return false;
+ }
String jsonString = "";
readWriteLock.readLock().lock();
try {
jsonString = loadConfigFromFile();
if (StringUtils.isBlank(jsonString)) {
- LOG.info("Load changed json {}, but no records configured",
getFileName());
- return false;
+ LOG.warn("Load changed json {}, but no records configured",
getFileName());
+ return true;
}
DataProxyConfigResponse metaConfig =
GSON.fromJson(jsonString, DataProxyConfigResponse.class);
if (!metaConfig.isResult() || metaConfig.getErrCode() !=
DataProxyConfigResponse.SUCC) {
LOG.warn("Load failed json config from {}, error code is {}",
getFileName(), metaConfig.getErrCode());
- return false;
+ return true;
}
DataProxyCluster clusterObj = metaConfig.getData();
if (clusterObj == null) {
LOG.warn("Load failed json config from {}, malformed content,
data is null", getFileName());
- return false;
- }
- if
(!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
- && !ConfigManager.handshakeManagerOk.get()) {
- LOG.info("Failed to load json config from {}, don't obtain
metadata from the Manager,"
- + " and the startup via the cache file is false",
getFileName());
- return false;
+ return true;
}
// update cache data
- if (updateCacheData(clusterObj)) {
- // update cache string
- synchronized (this.lastUpdVersion) {
+ boolean updated;
+ synchronized (this.lastUpdVersion) {
+ if (metaConfig.getMd5().equals(this.dataMd5)) {
+ LOG.warn("Load json config from {}, configure md5 not
changed!", getFileName());
+ return true;
+ }
+ updated = updateCacheData(clusterObj);
+ if (updated) {
if (this.lastSyncVersion.get() == 0) {
this.lastUpdVersion.set(System.currentTimeMillis());
this.lastSyncVersion.compareAndSet(0,
this.lastUpdVersion.get());
@@ -262,14 +268,12 @@ public class MetaConfigHolder extends ConfigHolder {
this.dataMd5 = metaConfig.getMd5();
this.dataStr = jsonString;
}
- LOG.info(
- "Load changed {}, loaded dataMd5={}, data={},
id2TopicSrcMap={}, mqClusterMap={}, id2TopicSinkMap={}",
- getFileName(), dataMd5, dataStr, id2TopicSrcMap,
mqClusterMap, id2TopicSinkMap);
- return true;
}
- return false;
+ if (updated) {
+ LOG.info("Load changed {} file success!", getFileName());
+ }
+ return true;
} catch (Throwable e) {
- //
LOG.warn("Process json {} changed data {} failure", getFileName(),
jsonString, e);
return false;
} finally {
@@ -324,9 +328,6 @@ public class MetaConfigHolder extends ConfigHolder {
// get topic config info
Map<String, IdTopicConfig> tmpTopicConfigMap =
buildCacheTopicConfig(mqType, inLongIds);
replaceCacheConfig(mqType, tmpClusterConfigMap, tmpTopicConfigMap);
- if (mqType.equals(CacheType.TUBE)) {
- executeCallbacks();
- }
return true;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesHolder.java
index 68ebd0e126..6c7ba90da1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesHolder.java
@@ -84,15 +84,15 @@ public abstract class PropertiesHolder extends ConfigHolder
{
try {
Map<String, String> loadMap = loadConfigFromFile();
if (loadMap == null || loadMap.isEmpty()) {
- LOG.debug("Load changed properties {}, but no records
configured", getFileName());
- return false;
+ LOG.warn("Load changed properties {}, but no records
configured", getFileName());
+ return true;
}
// filter blank items
Map<String, String> filteredMap = filterInValidRecords(loadMap);
if (filteredMap.isEmpty()) {
- LOG.info("Load changed properties {}, but the records are all
illegal {}",
+ LOG.warn("Load changed properties {}, but the records are all
illegal {}",
getFileName(), loadMap);
- return false;
+ return true;
}
// remove records
Set<String> rmvKeys = new HashSet<>();
@@ -116,13 +116,14 @@ public abstract class PropertiesHolder extends
ConfigHolder {
}
}
if (rmvKeys.isEmpty() && repKeys.isEmpty()) {
- return false;
+ LOG.warn("Load changed properties {}, but no add or delete
records", getFileName());
+ return true;
}
// update cache data
boolean result = updateCacheData();
// output update result
- LOG.info("Load changed properties {}, loaded config {}, updated
holder {}, updated cache {}",
- getFileName(), loadMap, confHolder, result);
+ LOG.info("Load changed properties {}, deleted_record = {},
updated_record = {}, updated_cache = {}",
+ getFileName(), rmvKeys.isEmpty(), repKeys.isEmpty(),
result);
return true;
} finally {
readWriteLock.readLock().unlock();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
index d7a7277b86..530eadfa6a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
@@ -63,7 +63,7 @@ public class VisitConfigHolder extends ConfigHolder {
try {
Map<String, Long> tmpHolder = loadFile();
if (tmpHolder == null) {
- return false;
+ return true;
}
// clear removed records
boolean added = false;
@@ -126,14 +126,23 @@ public class VisitConfigHolder extends ConfigHolder {
added = true;
}
}
- if (!tmpKeys.isEmpty()) {
- if (this.isBlackList) {
+ // output load result
+ if (this.isBlackList) {
+ if (!tmpKeys.isEmpty()) {
LOG.warn("Load BlackList data error, found error data
items: " + tmpKeys);
- } else {
+ }
+ if (added) {
+ LOG.info("Load BlackList data, new data items are added!");
+ }
+ } else {
+ if (!tmpKeys.isEmpty()) {
LOG.warn("Load WhiteList data error, found error data
items: " + tmpKeys);
}
+ if (removed) {
+ LOG.info("Load WhiteList data, cached data items are
deleted!");
+ }
}
- return (isBlackList && added) || (!isBlackList && removed);
+ return true;
} finally {
readWriteLock.writeLock().unlock();
}