This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bfd980ac3 [INLONG-6419][TubeMQ] Correct some misuse of
META_DEFAULT_BROKER_PORT (#6420)
bfd980ac3 is described below
commit bfd980ac357143c5b470915056b8a681a7d79c41
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Nov 7 15:27:52 2022 +0800
[INLONG-6419][TubeMQ] Correct some misuse of META_DEFAULT_BROKER_PORT
(#6420)
---
.../org/apache/inlong/tubemq/client/consumer/RmtDataCache.java | 4 ++--
.../org/apache/inlong/tubemq/client/producer/ProducerManager.java | 4 ++--
.../apache/inlong/tubemq/corebase/utils/DataConverterUtil.java | 8 ++++++--
.../apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java | 3 ++-
.../server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java | 2 +-
.../master/metamanage/metastore/dao/entity/BrokerConfEntity.java | 2 +-
6 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
index 3170f7db2..7cd18cf4c 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
@@ -443,8 +443,8 @@ public class RmtDataCache implements Closeable {
StringBuilder sBuilder) {
if (pkgCheckSum != lstBrokerConfigId.get()) {
if (pkgBrokerInfos != null) {
- brokersMap =
- DataConverterUtil.convertBrokerInfo(pkgBrokerInfos);
+ brokersMap = DataConverterUtil.convertBrokerInfo(
+ pkgBrokerInfos, consumerConfig.isTlsEnable());
lstBrokerConfigId.set(pkgCheckSum);
lastBrokerUpdatedTime = System.currentTimeMillis();
if (pkgBrokerInfos.isEmpty()) {
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 6ce1bfa06..fa5eb7fce 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -567,8 +567,8 @@ public class ProducerManager {
long pkgCheckSum, StringBuilder
sBuilder) {
if (pkgCheckSum != brokerInfoCheckSum) {
if (pkgBrokerInfos != null) {
- brokersMap =
- DataConverterUtil.convertBrokerInfo(pkgBrokerInfos);
+ brokersMap = DataConverterUtil.convertBrokerInfo(
+ pkgBrokerInfos, tubeClientConfig.isTlsEnable());
brokerInfoCheckSum = pkgCheckSum;
lastBrokerUpdatedTime = System.currentTimeMillis();
if (pkgBrokerInfos.isEmpty()) {
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
index 3d63c8e19..8d3736b21 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
@@ -142,15 +142,19 @@ public class DataConverterUtil {
* convert string info to @link BrokerInfo
*
* @param strBrokerInfos return a BrokerInfo Map
+ * @param enableTLS Whether to enable TLS
*/
- public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String>
strBrokerInfos) {
+ public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String>
strBrokerInfos,
+ boolean
enableTLS) {
Map<Integer, BrokerInfo> brokerInfoMap =
new ConcurrentHashMap<>();
if (strBrokerInfos != null) {
+ int brokerPort = enableTLS
+ ? TBaseConstants.META_DEFAULT_BROKER_TLS_PORT :
TBaseConstants.META_DEFAULT_BROKER_PORT;
for (String info : strBrokerInfos) {
if (info != null) {
BrokerInfo brokerInfo =
- new BrokerInfo(info,
TBaseConstants.META_DEFAULT_BROKER_PORT);
+ new BrokerInfo(info, brokerPort);
brokerInfoMap.put(brokerInfo.getBrokerId(), brokerInfo);
}
}
diff --git
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
index f7ef9576e..4721e5d6a 100644
---
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
+++
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
@@ -44,7 +44,8 @@ public class DataConverterUtilTest {
BrokerInfo broker = new BrokerInfo(0, "localhost", 1200);
List<String> strInfoList = new ArrayList<>();
strInfoList.add("0:localhost:1200");
- Map<Integer, BrokerInfo> brokerMap =
DataConverterUtil.convertBrokerInfo(strInfoList);
+ Map<Integer, BrokerInfo> brokerMap =
+ DataConverterUtil.convertBrokerInfo(strInfoList, false);
assertEquals("broker should be equal", broker,
brokerMap.get(broker.getBrokerId()));
// partition convert
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 33a648f09..f00be53a9 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -220,7 +220,7 @@ public class BdbBrokerConfEntity implements Serializable {
}
public String getSimpleTLSBrokerInfo() {
- if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_PORT) {
+ if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_TLS_PORT)
{
return this.brokerTLSSimpleInfo;
} else {
return this.brokerTLSFullInfo;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 55691bef8..4a414b1bc 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -188,7 +188,7 @@ public class BrokerConfEntity extends BaseEntity implements
Cloneable {
}
public String getSimpleTLSBrokerInfo() {
- if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_PORT) {
+ if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_TLS_PORT)
{
return this.brokerTLSSimpleInfo;
} else {
return this.brokerTLSFullInfo;