This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 9e184b1049f940b6b1955bd35271119d42dcad54 Author: gosonzhang <[email protected]> AuthorDate: Tue Dec 15 14:08:54 2020 +0800 [TUBEMQ-451]Replace ConsumeTupleInfo with Tuple2 (#349) Co-authored-by: gosonzhang <[email protected]> --- .../org/apache/tubemq/server/master/TMaster.java | 37 +++++++++++----------- .../nodeconsumer/ConsumerInfoHolder.java | 14 ++------ 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java index b1bf1dc..8811cb0 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java @@ -71,6 +71,7 @@ import org.apache.tubemq.corebase.utils.ConcurrentHashSet; import org.apache.tubemq.corebase.utils.DataConverterUtil; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.corerpc.RpcConfig; import org.apache.tubemq.corerpc.RpcConstants; import org.apache.tubemq.corerpc.RpcServiceFactory; @@ -1883,14 +1884,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (consumerId == null) { continue; } - ConsumerInfoHolder.ConsumeTupleInfo tupleInfo = + Tuple2<String, ConsumerInfo> tupleInfo = consumerHolder.getConsumeTupleInfo(consumerId); if (tupleInfo == null - || tupleInfo.groupName == null - || tupleInfo.consumerInfo == null) { + || tupleInfo.f0 == null + || tupleInfo.f1 == null) { continue; } - List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName); + List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); Map<String, List<Partition>> topicSubPartMap = entry.getValue(); List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); @@ -1907,7 +1908,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { currentPartMap = new HashMap<>(); } } - if (tupleInfo.consumerInfo.isOverTLS()) { + if (tupleInfo.f1.isOverTLS()) { for (Partition currentPart : currentPartMap.values()) { if (!blackTopicList.contains(currentPart.getTopic())) { boolean found = false; @@ -1923,8 +1924,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.groupName, - tupleInfo.consumerInfo.isOverTLS(), currentPart)); + .add(new SubscribeInfo(consumerId, tupleInfo.f0, + tupleInfo.f1.isOverTLS(), currentPart)); } for (Partition finalPart : finalPartList) { if (!blackTopicList.contains(finalPart.getTopic())) { @@ -1940,7 +1941,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { continue; } addedSubInfoList.add(new SubscribeInfo(consumerId, - tupleInfo.groupName, true, finalPart)); + tupleInfo.f0, true, finalPart)); } } } else { @@ -1948,14 +1949,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if ((blackTopicList.contains(currentPart.getTopic())) || (!finalPartList.contains(currentPart))) { deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.groupName, false, currentPart)); + .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart)); } } for (Partition finalPart : finalPartList) { if ((currentPartMap.get(finalPart.getPartitionKey()) == null) && (!blackTopicList.contains(finalPart.getTopic()))) { addedSubInfoList.add(new SubscribeInfo(consumerId, - tupleInfo.groupName, false, finalPart)); + tupleInfo.f0, false, finalPart)); } } } @@ -2033,16 +2034,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (consumerId == null) { continue; } - ConsumerInfoHolder.ConsumeTupleInfo tupleInfo = + Tuple2<String, ConsumerInfo> tupleInfo = consumerHolder.getConsumeTupleInfo(consumerId); if (tupleInfo == null - || tupleInfo.groupName == null - || tupleInfo.consumerInfo == null) { + || tupleInfo.f0 == null + || tupleInfo.f1 == null) { continue; } List<String> blackTopicList = - this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName); + this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue(); List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); @@ -2066,15 +2067,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if ((blackTopicList.contains(currentPart.getTopic())) || (finalPartMap.get(currentPart.getPartitionKey()) == null)) { deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.groupName, - tupleInfo.consumerInfo.isOverTLS(), currentPart)); + .add(new SubscribeInfo(consumerId, tupleInfo.f0, + tupleInfo.f1.isOverTLS(), currentPart)); } } for (Partition finalPart : finalPartMap.values()) { if ((currentPartMap.get(finalPart.getPartitionKey()) == null) && (!blackTopicList.contains(finalPart.getTopic()))) { - addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.groupName, - tupleInfo.consumerInfo.isOverTLS(), finalPart)); + addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0, + tupleInfo.f1.isOverTLS(), finalPart)); } } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java index 5c78858..fc5d932 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.tubemq.corebase.cluster.ConsumerInfo; +import org.apache.tubemq.corebase.utils.Tuple2; public class ConsumerInfoHolder { @@ -369,7 +370,7 @@ public class ConsumerInfoHolder { return consumer; } - public ConsumeTupleInfo getConsumeTupleInfo(String consumerId) { + public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) { try { rwLock.readLock().lock(); ConsumerInfo consumerInfo = null; @@ -378,7 +379,7 @@ public class ConsumerInfoHolder { if (consumeBandInfo != null) { consumerInfo = consumeBandInfo.getConsumerInfo(consumerId); } - return new ConsumeTupleInfo(groupName, consumerInfo); + return new Tuple2<String, ConsumerInfo>(groupName, consumerInfo); } finally { rwLock.readLock().unlock(); } @@ -424,13 +425,4 @@ public class ConsumerInfoHolder { groupInfoMap.clear(); } - public class ConsumeTupleInfo { - public String groupName; - public ConsumerInfo consumerInfo; - - public ConsumeTupleInfo(String groupName, ConsumerInfo consumerInfo) { - this.groupName = groupName; - this.consumerInfo = consumerInfo; - } - } }
