This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a490eb85b [INLONG-6421][TubeMQ] Fix the host is blank error (#6425)
a490eb85b is described below
commit a490eb85b12f652921429f3cdcf11bb00e38f0f6
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Nov 7 17:22:11 2022 +0800
[INLONG-6421][TubeMQ] Fix the host is blank error (#6425)
---
.../master/bdbstore/bdbentitys/BdbBrokerConfEntity.java | 2 +-
.../server/master/metamanage/DefaultMetaDataService.java | 16 +++++++++-------
.../metastore/dao/entity/BrokerConfEntity.java | 3 ++-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 8 ++++----
4 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index f00be53a9..98a8f5a1e 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -526,7 +526,7 @@ public class BdbBrokerConfEntity implements Serializable {
String.valueOf(brokerWebPort));
}
- private void buildStrInfo() {
+ public void buildStrInfo() {
StringBuilder sBuilder = new StringBuilder(512);
this.brokerAddress = sBuilder.append(this.brokerIp)
.append(TokenConstants.ATTR_SEP)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index c26ac6118..40d83c156 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -126,8 +126,8 @@ public class DefaultMetaDataService implements
MetaDataService {
// initial running data
BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
brokerRunManager.updBrokerStaticInfo(this.metaConfigMapper.getBrokerConfInfo(null));
- isStarted = true;
serviceStartTime = System.currentTimeMillis();
+ isStarted = true;
logger.info("DefaultMetaDataService Started");
}
@@ -363,17 +363,19 @@ public class DefaultMetaDataService implements
MetaDataService {
if (!metaConfigMapper.addOrUpdBrokerConfig(isAddOp, entity, strBuff,
result)) {
return new BrokerProcessResult(entity.getBrokerId(),
entity.getBrokerIp(), result);
}
+ BrokerConfEntity curEntity =
+ metaConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
// update broker static information
- this.tMaster.getBrokerRunManager().updBrokerStaticInfo(entity);
+ this.tMaster.getBrokerRunManager().updBrokerStaticInfo(curEntity);
if (isAddOp) {
// add system topic if absent
- metaConfigMapper.addSystemTopicDeploy(entity.getBrokerId(),
- entity.getBrokerPort(), entity.getBrokerIp(), strBuff);
+ metaConfigMapper.addSystemTopicDeploy(curEntity.getBrokerId(),
+ curEntity.getBrokerPort(), curEntity.getBrokerIp(),
strBuff);
}
// auto trigger configure sync
- triggerBrokerConfDataSync(entity.getBrokerId(), strBuff, result);
+ triggerBrokerConfDataSync(curEntity.getBrokerId(), strBuff, result);
// return result
- return new BrokerProcessResult(entity.getBrokerId(),
entity.getBrokerIp(), result);
+ return new BrokerProcessResult(curEntity.getBrokerId(),
curEntity.getBrokerIp(), result);
}
@Override
@@ -502,7 +504,7 @@ public class DefaultMetaDataService implements
MetaDataService {
return result.isSuccess();
}
runStatusInfo.notifyDataChanged();
- logger.info(strBuff.append("[Meta data] trigger broker syncStatus
info, brokerId is ")
+ logger.info(strBuff.append("[Meta data] trigger broker syncStatus
info, brokerId=")
.append(brokerId).toString());
strBuff.delete(0, strBuff.length());
result.setSuccResult(null);
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 4a414b1bc..4ddc4117f 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -119,6 +119,7 @@ public class BrokerConfEntity extends BaseEntity implements
Cloneable {
bdbEntity.setDftMemCacheFlushIntvl(topicProps.getMemCacheFlushIntvl());
bdbEntity.setDftUnFlushDataHold(topicProps.getUnflushDataHold());
bdbEntity.setDataStore(topicProps.getDataStoreType(),
topicProps.getDataPath());
+ bdbEntity.buildStrInfo();
return bdbEntity;
}
@@ -306,8 +307,8 @@ public class BrokerConfEntity extends BaseEntity implements
Cloneable {
}
}
if (changed) {
- updSerialId();
buildStrInfo();
+ updSerialId();
}
return changed;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 26678c2b1..c1f781f85 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -288,9 +288,9 @@ public class DefBrokerRunManager implements
BrokerRunManager, ConfigObserver {
// process removed topic info
if (isTackRmvInfo) {
metaDataService.delCleanedTopicDeployInfo(brokerId, removedTopics,
sBuffer, result);
- logger.info(sBuffer.append("[Broker Report] receive broker removed
topics = ")
- .append(removedTopics.toString()).append(", removed result
is ")
- .append(result.getErrMsg()).toString());
+ logger.info(sBuffer.append("[Broker Report]
brokerId=").append(brokerId)
+ .append(" removed topics = ").append(removedTopics)
+ .append(", removed result is
").append(result.getErrMsg()).toString());
sBuffer.delete(0, sBuffer.length());
}
brokerAbnHolder.updateBrokerReportStatus(brokerId, rptReadStatus,
rptWriteStatus);
@@ -317,7 +317,7 @@ public class DefBrokerRunManager implements
BrokerRunManager, ConfigObserver {
}
boolean isOverTls = runStatusInfo.isOverTLS();
releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId(), false);
- logger.info(sBuffer.append("[Broker Closed]").append(brokerId)
+ logger.info(sBuffer.append("[Broker Closed]
brokerId=").append(brokerId)
.append(" unregister success,
isOverTLS=").append(isOverTls).toString());
result.setSuccResult(null);
return result.isSuccess();