This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6c3d72b0ff [INLONG-8192][DataProxy] The topic name generated by
dataproxy is incorrect (#8194)
6c3d72b0ff is described below
commit 6c3d72b0ffb0064a0147b41b0d774f3570bd6a63
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jun 8 21:30:54 2023 +0800
[INLONG-8192][DataProxy] The topic name generated by dataproxy is incorrect
(#8194)
---
.../dataproxy/config/holder/MetaConfigHolder.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 9c7078ee48..70a07bde2c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -145,12 +145,9 @@ public class MetaConfigHolder extends ConfigHolder {
}
public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
- if (StringUtils.isBlank(inDataMd5) ||
StringUtils.isBlank(inDataJsonStr)) {
- return false;
- }
- if (inDataMd5.equalsIgnoreCase(dataMd5)) {
- LOG.info("Update json {}, but the stored md5sum {} is equals to
changed md5sum {}",
- getFileName(), dataMd5, inDataMd5);
+ if (StringUtils.isBlank(inDataMd5)
+ || StringUtils.isBlank(inDataJsonStr)
+ || inDataMd5.equalsIgnoreCase(dataMd5)) {
return false;
}
if (storeConfigToFile(inDataJsonStr)) {
@@ -368,12 +365,15 @@ public class MetaConfigHolder extends ConfigHolder {
}
tenant =
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
nameSpace =
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
+ if (StringUtils.isBlank(idObject.getTopic())) {
+ // namespace field must exist and value not be empty,
+ // otherwise it is an illegal configuration item.
+ continue;
+ }
if (mqType.equals(CacheType.TUBE)) {
- if (StringUtils.isNotBlank(nameSpace)) {
- topicName = nameSpace;
- }
+ topicName = nameSpace;
} else if (mqType.equals(CacheType.KAFKA)) {
- if (StringUtils.isNotBlank(nameSpace)) {
+ if (topicName.equals(streamId)) {
topicName =
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
}
}