This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit cc5796a424b7fde4d51d562eed39093550526379 Author: gosonzhang <[email protected]> AuthorDate: Fri Dec 25 10:40:51 2020 +0800 [TUBEMQ-475] add the offset clone api of the consume group --- .../broker/metadata/BrokerMetadataManager.java | 3 +- .../server/broker/metadata/MetadataManager.java | 2 + .../server/broker/metadata/TopicMetadata.java | 16 ++ .../server/broker/offset/DefaultOffsetManager.java | 199 ++++++++++++++++- .../tubemq/server/broker/offset/OffsetService.java | 18 ++ .../server/broker/web/BrokerAdminServlet.java | 246 +++++++++++++++++++++ .../tubemq/server/common/fielddef/WebFieldDef.java | 10 +- .../server/common/offsetstorage/OffsetStorage.java | 11 +- .../common/offsetstorage/ZkOffsetStorage.java | 139 +++++++++++- .../common/offsetstorage/zookeeper/ZKUtil.java | 19 ++ .../org/apache/tubemq/server/master/TMaster.java | 3 +- 11 files changed, 647 insertions(+), 19 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java index 1e2f21e..8568372 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java @@ -143,7 +143,8 @@ public class BrokerMetadataManager implements MetadataManager { return topicConfigMap.get(topic); } - public ConcurrentHashMap<String, TopicMetadata> getTopicConfigMap() { + @Override + public Map<String, TopicMetadata> getTopicConfigMap() { return topicConfigMap; } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java index d638e5a..9ee9936 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/MetadataManager.java @@ -80,4 +80,6 @@ public interface MetadataManager { String getDefDeletePolicy(); String getTopicDeletePolicy(String topic); + + Map<String, TopicMetadata> getTopicConfigMap(); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java index 4ebfa4d..c582606 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java @@ -17,10 +17,15 @@ package org.apache.tubemq.server.broker.metadata; +import java.util.HashSet; +import java.util.Set; + +import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.common.TStatusConstants; + /*** * Topic's metadata. Contains topic name, partitions count, etc. */ @@ -235,6 +240,17 @@ public class TopicMetadata { this.unflushInterval = unflushInterval; } + // builder the partitionId set for each store + public Set<Integer> getAllPartitionIds() { + Set<Integer> partIds = new HashSet<>(); + for (int i = 0; i < numTopicStores; i++) { + for (int j = 0; j < numPartitions; j++) { + partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j); + } + } + return partIds; + } + public int getStatusId() { return statusId; } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index 82a758c..bdd85b3 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -17,13 +17,17 @@ package org.apache.tubemq.server.broker.offset; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.daemon.AbstractDaemonService; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.BrokerConfig; import org.apache.tubemq.server.broker.msgstore.MessageStore; +import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.apache.tubemq.server.common.offsetstorage.OffsetStorage; import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo; @@ -49,7 +53,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse public DefaultOffsetManager(final BrokerConfig brokerConfig) { super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs()); this.brokerConfig = brokerConfig; - zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig()); + zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig(), + true, brokerConfig.getBrokerId()); super.start(); } @@ -323,6 +328,187 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse } /*** + * Get in-memory and in zk group set + * + * @return booked group in memory and in zk + */ + @Override + public Set<String> getBookedGroups() { + Set<String> groupSet = new HashSet<>(); + groupSet.addAll(cfmOffsetMap.keySet()); + Map<String, Set<String>> localGroups = + zkOffsetStorage.getZkLocalGroupTopicInfos(); + groupSet.addAll(localGroups.keySet()); + return groupSet; + } + + /*** + * Get in-memory group set + * + * @return booked group in memory + */ + public Set<String> getInMemoryGroups() { + Set<String> cacheGroup = new HashSet<>(); + cacheGroup.addAll(cfmOffsetMap.keySet()); + return cacheGroup; + } + + /*** + * Get in-zookeeper but not in memory's group set + * + * @return booked group in zookeeper + */ + @Override + public Set<String> getUnusedGroupInfo() { + Set<String> unUsedGroups = new HashSet<>(); + Map<String, Set<String>> localGroups = + zkOffsetStorage.getZkLocalGroupTopicInfos(); + for (String groupName : localGroups.keySet()) { + if (!cfmOffsetMap.containsKey(groupName)) { + unUsedGroups.add(groupName); + } + } + return unUsedGroups; + } + + /*** + * Get the topic set subscribed by the consumer group + * @param group + * @return topic set subscribed + */ + @Override + public Set<String> getGroupSubInfo(String group) { + Set<String> result = new HashSet<>(); + Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group); + if (topicPartOffsetMap == null) { + Map<String, Set<String>> localGroups = + zkOffsetStorage.getZkLocalGroupTopicInfos(); + result = localGroups.get(group); + } else { + for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) { + result.add(storageInfo.getTopic()); + } + } + return result; + } + + /*** + * Get group's offset by Specified topic-partitions + * @param group + * @param topicPartMap + * @return group offset info in memory or zk + */ + @Override + public Map<String, Map<Integer, Long>> queryGroupOffset( + String group, Map<String, Set<Integer>> topicPartMap) { + Map<String, Map<Integer, Long>> result = new HashMap<>(); + // search group from memory + Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group); + if (topicPartOffsetMap == null) { + // query from zookeeper + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + Map<Integer, Long> qryResult = + zkOffsetStorage.queryGroupOffsetInfo( + group, entry.getKey(), entry.getValue()); + Map<Integer, Long> offsetMap = new HashMap<>(); + for (Map.Entry<Integer, Long> item : qryResult.entrySet()) { + if (item.getValue() != null) { + offsetMap.put(item.getKey(), item.getValue()); + } + } + if (!offsetMap.isEmpty()) { + result.put(entry.getKey(), offsetMap); + } + } + } else { + // found in memory, get offset values + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + Map<Integer, Long> offsetMap = new HashMap<>(); + for (Integer partitionId : entry.getValue()) { + String offsetCacheKey = + getOffsetCacheKey(entry.getKey(), partitionId); + OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey); + if (offsetInfo != null) { + offsetMap.put(partitionId, offsetInfo.getOffset()); + } + } + if (!offsetMap.isEmpty()) { + result.put(entry.getKey(), offsetMap); + } + } + } + return result; + } + + + /*** + * Reset offset. + * + * @param storeManager + * @param groups + * @param topicPartOffsetMap + * @param modifier + * @return at least one record modified + */ + @Override + public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups, + Map<String, Map<Integer, Long>> topicPartOffsetMap, + String modifier) { + long oldOffset = -1; + long reSetOffset = -1; + boolean changed = false; + MessageStore store = null; + StringBuilder strBuidler = new StringBuilder(512); + // set offset by group + for (String group : groups) { + for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) { + Map<Integer, Long> partOffsetMap = entry.getValue(); + if (partOffsetMap == null) { + continue; + } + // set offset + for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) { + if (entry1.getValue() == null) { + continue; + } + reSetOffset = entry1.getValue(); + // get topic store + try { + store = storeManager.getOrCreateMessageStore( + entry.getKey(), entry1.getKey()); + } catch (Throwable e) { + // + } + if (store == null) { + continue; + } + long firstOffset = store.getIndexMinOffset(); + long lastOffset = store.getIndexMaxOffset(); + // adjust reseted offset value + reSetOffset = reSetOffset < firstOffset + ? firstOffset : Math.min(reSetOffset, lastOffset); + String offsetCacheKey = + getOffsetCacheKey(entry.getKey(), entry1.getKey()); + getAndResetTmpOffset(group, offsetCacheKey); + OffsetStorageInfo regInfo = loadOrCreateOffset(group, + entry.getKey(), entry1.getKey(), offsetCacheKey, 0); + oldOffset = regInfo.getAndSetOffset(reSetOffset); + changed = true; + logger.info(strBuidler + .append("[Offset Manager] Update offset by modifier=") + .append(modifier).append(",reset offset=").append(reSetOffset) + .append(",old offset=").append(oldOffset) + .append(",updated offset=").append(regInfo.getOffset()) + .append(",group=").append(group) + .append(",topic-partId=").append(offsetCacheKey).toString()); + strBuidler.delete(0, strBuidler.length()); + } + } + } + return changed; + } + + /*** * Set temp offset. * * @param group @@ -425,10 +611,10 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse OffsetStorageInfo regInfo = regInfoMap.get(offsetCacheKey); if (regInfo == null) { OffsetStorageInfo tmpRegInfo = - zkOffsetStorage.loadOffset(group, topic, brokerConfig.getBrokerId(), partitionId); + zkOffsetStorage.loadOffset(group, topic, partitionId); if (tmpRegInfo == null) { - tmpRegInfo = - new OffsetStorageInfo(topic, brokerConfig.getBrokerId(), partitionId, defOffset, 0); + tmpRegInfo = new OffsetStorageInfo(topic, + brokerConfig.getBrokerId(), partitionId, defOffset, 0); } regInfo = regInfoMap.putIfAbsent(offsetCacheKey, tmpRegInfo); if (regInfo == null) { @@ -443,4 +629,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse .append("-").append(partitionId).toString(); } + private String getOffsetCacheKey(String topic, String partitionId) { + return new StringBuilder(256).append(topic) + .append("-").append(partitionId).toString(); + } + } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java index 066fb3b..05f0724 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java @@ -17,9 +17,13 @@ package org.apache.tubemq.server.broker.offset; +import java.util.Map; +import java.util.Set; import org.apache.tubemq.server.broker.msgstore.MessageStore; +import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo; + /*** * Offset manager service interface. */ @@ -51,4 +55,18 @@ public interface OffsetService { long getTmpOffset(final String group, final String topic, int partitionId); + Set<String> getBookedGroups(); + + Set<String> getInMemoryGroups(); + + Set<String> getUnusedGroupInfo(); + + Set<String> getGroupSubInfo(String group); + + Map<String, Map<Integer, Long>> queryGroupOffset( + String group, Map<String, Set<Integer>> topicPartMap); + + boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups, + Map<String, Map<Integer, Long>> topicPartOffsetMap, + String modifier); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index 1b3f524..91bfd23 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -17,6 +17,8 @@ package org.apache.tubemq.server.broker.web; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -26,6 +28,7 @@ import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.TubeBroker; +import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; @@ -71,6 +74,15 @@ public class BrokerAdminServlet extends AbstractWebHandler { // get all registered methods innRegisterWebMethod("admin_get_methods", "adminQueryAllMethods"); + // Query all consumer groups booked on the Broker. + innRegisterWebMethod("admin_query_group", + "adminQueryBookedGroup"); + // query consumer group's offset + innRegisterWebMethod("admin_query_offset", + "adminQueryGroupOffSet"); + // clone consumer group's offset from source to target + innRegisterWebMethod("admin_clone_offset", + "adminCloneGroupOffSet"); } public void adminQueryAllMethods(HttpServletRequest req, @@ -560,5 +572,239 @@ public class BrokerAdminServlet extends AbstractWebHandler { sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}"); } + /*** + * Query all consumer groups booked on the Broker. + * + * @param req + * @param sBuilder process result + */ + public void adminQueryBookedGroup(HttpServletRequest req, + StringBuilder sBuilder) { + // get group list + ProcessResult result = WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.WITHDIVIDE, false, false); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + boolean withDivide = (boolean) result.retData1; + // get offset service + int itemCnt = 0; + int totalCnt = 0; + OffsetService offsetService = broker.getOffsetManager(); + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); + if (withDivide) { + // query in-memory group name set + Set<String> onlineGroups = offsetService.getInMemoryGroups(); + sBuilder.append("{\"type\":\"in-cache\",\"groupName\":["); + for (String group : onlineGroups) { + if (itemCnt++ > 0) { + sBuilder.append(","); + } + sBuilder.append("\"").append(group).append("\""); + } + sBuilder.append("],\"groupCount\":").append(itemCnt).append("}"); + totalCnt++; + sBuilder.append(","); + // query in-zk group name set + itemCnt = 0; + Set<String> onZKGroup = offsetService.getUnusedGroupInfo(); + sBuilder.append("{\"type\":\"in-zk\",\"groupName\":["); + for (String group : onZKGroup) { + if (itemCnt++ > 0) { + sBuilder.append(","); + } + sBuilder.append("\"").append(group).append("\""); + } + sBuilder.append("],\"groupCount\":").append(itemCnt).append("}"); + totalCnt++; + } else { + Set<String> allGroups = offsetService.getBookedGroups(); + sBuilder.append("{\"type\":\"all\",\"groupName\":["); + for (String group : allGroups) { + if (itemCnt++ > 0) { + sBuilder.append(","); + } + sBuilder.append("\"").append(group).append("\""); + } + sBuilder.append("],\"groupCount\":").append(itemCnt).append("}"); + totalCnt++; + } + sBuilder.append("],\"dataCount\":").append(totalCnt).append("}"); + } + + /*** + * Query consumer group offset. + * + * @param req + * @param sBuilder process result + */ + public void adminQueryGroupOffSet(HttpServletRequest req, + StringBuilder sBuilder) { + // get group list + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, false, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // filter invalid groups + Set<String> qryGroupNameSet = new HashSet<>(); + Set<String> inGroupNameSet = (Set<String>) result.retData1; + Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups(); + if (inGroupNameSet.isEmpty()) { + qryGroupNameSet = bookedGroupSet; + } else { + for (String group : inGroupNameSet) { + if (bookedGroupSet.contains(group)) { + qryGroupNameSet.add(group); + } + } + } + // get the topic set to be queried + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // get target consume group name + Set<String> topicSet = (Set<String>) result.retData1; + // verify the acquired Topic set and + // query the corresponding offset information + Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>(); + for (String group : qryGroupNameSet) { + Map<String, Set<Integer>> topicPartMap = + validAndGetPartitions(group, topicSet); + Map<String, Map<Integer, Long>> groupOffsetMap = + broker.getOffsetManager().queryGroupOffset(group, topicPartMap); + groupOffsetMaps.put(group, groupOffsetMap); + } + // builder result + int totalCnt = 0; + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); + for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry + : groupOffsetMaps.entrySet()) { + if (totalCnt++ > 0) { + sBuilder.append(","); + } + Map<String, Map<Integer, Long>> topicPartMap = entry.getValue(); + sBuilder.append("{\"groupName\":\"").append(entry.getKey()) + .append("\",\"subInfo\":["); + int topicCnt = 0; + for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) { + if (topicCnt++ > 0) { + sBuilder.append(","); + } + Map<Integer, Long> partOffMap = entry1.getValue(); + sBuilder.append("{\"topicName\":\"").append(entry1.getKey()) + .append("\",\"offsets\":["); + int partCnt = 0; + for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) { + if (partCnt++ > 0) { + sBuilder.append(","); + } + sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId()) + .append(TokenConstants.ATTR_SEP).append(entry1.getKey()) + .append(TokenConstants.ATTR_SEP).append(entry2.getKey()) + .append("\":").append(entry2.getValue()).append("}"); + } + sBuilder.append("],\"partCount\":").append(partCnt).append("}"); + } + sBuilder.append("],\"topicCount\":").append(topicCnt).append("}"); + } + sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}"); + } + + /*** + * Clone consume group offset, clone A group's offset to other group. + * + * @param req + * @param sBuilder process result + */ + public void adminCloneGroupOffSet(HttpServletRequest req, + StringBuilder sBuilder) { + // get source consume group name + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.SRCGROUPNAME, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + final String srcGroupName = (String) result.retData1; + // get modify user + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + final String modifier = (String) result.retData1; + // get source consume group's topic set cloned to target group + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // get target consume group name + Set<String> srcTopicNameSet = (Set<String>) result.retData1; + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.TGTCOMPSGROUPNAME, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Set<String> tgtGroupNameSet = (Set<String>) result.retData1; + // check sourceGroup if existed + Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups(); + if (!bookedGroups.contains(srcGroupName)) { + WebParameterUtils.buildFailResult(sBuilder, + new StringBuilder(512).append("Parameter ") + .append(WebFieldDef.SRCGROUPNAME.name).append(": ") + .append(srcGroupName) + .append(" has not been registered on this Broker!").toString()); + return; + } + // valid topic and get topic's partitionIds + Map<String, Set<Integer>> topicPartMap = + validAndGetPartitions(srcGroupName, srcTopicNameSet); + if (topicPartMap.isEmpty()) { + WebParameterUtils.buildFailResult(sBuilder, + new StringBuilder(512).append("Parameter ") + .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ") + .append(srcGroupName).append(" subscribed topic set!").toString()); + return; + } + // query offset from source group + Map<String, Map<Integer, Long>> srcGroupOffsets = + broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap); + boolean changed = broker.getOffsetManager().modifyGroupOffset( + broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier); + // builder return result + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); + } + + private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) { + Map<String, Set<Integer>> topicPartMap = new HashMap<>(); + // query stored topic set stored in memory or zk + if (topicSet.isEmpty()) { + topicSet = broker.getOffsetManager().getGroupSubInfo(group); + } + // get topic's partitionIds + if (topicSet != null) { + Map<String, TopicMetadata> topicConfigMap = + broker.getMetadataManager().getTopicConfigMap(); + if (topicConfigMap != null) { + for (String topic : topicSet) { + TopicMetadata topicMetadata = topicConfigMap.get(topic); + if (topicMetadata != null) { + topicPartMap.put(topic, topicMetadata.getAllPartitionIds()); + } + } + } + } + return topicPartMap; + } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java index f73959e..ec97421 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -70,7 +70,15 @@ public enum WebFieldDef { COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT, "Broker ID", RegexDef.TMP_NUMBER), WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN, - "Require return ip information, default is false"); + "Require return ip information, default is false"), + WITHDIVIDE(17, "divide", "div", WebFieldType.BOOLEAN, + "Need to divide the returned result, default is false"), + SRCGROUPNAME(18, "sourceGroupName", "srcGroup", WebFieldType.STRING, + "Offset clone source group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH, + RegexDef.TMP_GROUP), + TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup", + WebFieldType.COMPSTRING, "Offset clone target group name", + TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java index 470aafd..dca7ca8 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java @@ -18,6 +18,8 @@ package org.apache.tubemq.server.common.offsetstorage; import java.util.Collection; +import java.util.Map; +import java.util.Set; public interface OffsetStorage { @@ -25,9 +27,16 @@ public interface OffsetStorage { void close(); OffsetStorageInfo loadOffset(final String group, - final String topic, int brokerId, int partitionId); + final String topic, int partitionId); void commitOffset(final String group, final Collection<OffsetStorageInfo> offsetInfoList, boolean isFailRetry); + + Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos(); + + Map<String, Set<String>> getZkLocalGroupTopicInfos(); + + Map<Integer, Long> queryGroupOffsetInfo(String group, String topic, + Set<Integer> partitionIds); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java index 7c39bfd..8094151 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java @@ -19,6 +19,12 @@ package org.apache.tubemq.server.common.offsetstorage; import java.net.BindException; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.server.broker.exception.OffsetStoreException; import org.apache.tubemq.server.common.TServerConstants; @@ -42,7 +48,7 @@ public class ZkOffsetStorage implements OffsetStorage { public void uncaughtException(Thread t, Throwable e) { if (e instanceof BindException) { logger.error("Bind failed.", e); - System.exit(1); + // System.exit(1); } if (e instanceof IllegalStateException && e.getMessage().contains("Shutdown in progress")) { @@ -56,27 +62,33 @@ public class ZkOffsetStorage implements OffsetStorage { private final String tubeZkRoot; private final String consumerZkDir; + private final boolean isBroker; + private final int brokerId; private ZKConfig zkConfig; private ZooKeeperWatcher zkw; + // group-topic-brokerid + private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>(); + // group-topic + private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>(); - /** - * Constructor of ZkOffsetStorage - * - * @param zkConfig the zookeeper configuration - */ - public ZkOffsetStorage(final ZKConfig zkConfig) { + + public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) { this.zkConfig = zkConfig; + this.brokerId = brokerId; + this.isBroker = isBroker; this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot()); this.consumerZkDir = this.tubeZkRoot + "/consumers-v3"; try { this.zkw = new ZooKeeperWatcher(zkConfig); } catch (Throwable e) { logger.error(new StringBuilder(256) - .append("Failed to connect ZooKeeper server (") + .append("[ZkOffsetStorage] Failed to connect ZooKeeper server (") .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e); System.exit(1); } - logger.info("ZooKeeper Offset Storage initiated!"); + logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper"); + queryAllZKGroupTopicInfo(); + logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!"); } @Override @@ -90,6 +102,16 @@ public class ZkOffsetStorage implements OffsetStorage { } @Override + public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() { + return zkGroupTopicBrokerInfos; + } + + @Override + public Map<String, Set<String>> getZkLocalGroupTopicInfos() { + return zkLocalGroupTopicInfos; + } + + @Override public void commitOffset(final String group, final Collection<OffsetStorageInfo> offsetInfoList, boolean isFailRetry) { @@ -125,10 +147,11 @@ public class ZkOffsetStorage implements OffsetStorage { } @Override - public OffsetStorageInfo loadOffset(final String group, final String topic, int brokerId, int partitionId) { + public OffsetStorageInfo loadOffset(final String group, final String topic, int partitionId) { String znode = new StringBuilder(512).append(this.consumerZkDir).append("/") .append(group).append("/offsets/").append(topic).append("/") - .append(brokerId).append(TokenConstants.HYPHEN).append(partitionId).toString(); + .append(brokerId).append(TokenConstants.HYPHEN) + .append(partitionId).toString(); String offsetZkInfo; try { offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode); @@ -183,6 +206,100 @@ public class ZkOffsetStorage implements OffsetStorage { } } + /** + * Get offset stored in zookeeper, if not found or error, set null + * <p/> + * + * @return partitionId--offset map info + */ + @Override + public Map<Integer, Long> queryGroupOffsetInfo(String group, String topic, + Set<Integer> partitionIds) { + StringBuilder sBuider = new StringBuilder(512); + String basePath = sBuider.append(this.consumerZkDir).append("/") + .append(group).append("/offsets/").append(topic).append("/") + .append(brokerId).append(TokenConstants.HYPHEN).toString(); + sBuider.delete(0, sBuider.length()); + String offsetZkInfo = null; + Map<Integer, Long> offsetMap = new HashMap<>(partitionIds.size()); + for (Integer partitionId : partitionIds) { + String offsetNode = sBuider.append(basePath).append(partitionId).toString(); + sBuider.delete(0, sBuider.length()); + try { + offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, offsetNode); + if (offsetZkInfo == null) { + offsetMap.put(partitionId, null); + } else { + String[] offsetInfoStrs = + offsetZkInfo.split(TokenConstants.HYPHEN); + offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1])); + } + } catch (Throwable e) { + offsetMap.put(partitionId, null); + } + } + return offsetMap; + } + + /** + * Get group-topic-brokerid map info stored in zookeeper. + * <p/> + * The broker only cares about the content of its own node, + * so this part only queries when the node starts, and + * caches relevant data in the memory for finding + * + */ + private void queryAllZKGroupTopicInfo() { + StringBuilder sBuider = new StringBuilder(512); + // get all booked groups name + String groupNode = sBuider.append(this.consumerZkDir).toString(); + List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode); + sBuider.delete(0, sBuider.length()); + if (bookedGroups != null) { + // get topic info by group + for (String group : bookedGroups) { + String topicNode = sBuider.append(groupNode) + .append("/").append(group).append("/offsets").toString(); + List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode); + sBuider.delete(0, sBuider.length()); + Set<String> topicSet = new HashSet<>(); + Map<String, Set<String>> topicBrokerSet = new HashMap<>(); + if (consumeTopics != null) { + // get broker info by topic + for (String topic : consumeTopics) { + String brokerNode = sBuider.append(topicNode) + .append("/").append(topic).toString(); + List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode); + sBuider.delete(0, sBuider.length()); + Set<String> brokerIdSet = new HashSet<>(); + if (brokerIds != null) { + for (String idStr : brokerIds) { + if (idStr != null) { + String[] brokerPartIdStrs = + idStr.split(TokenConstants.HYPHEN); + brokerIdSet.add(brokerPartIdStrs[0]); + } + } + if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) { + topicSet.add(topic); + } + } + topicBrokerSet.put(topic, brokerIdSet); + } + } + if (!topicSet.isEmpty()) { + zkLocalGroupTopicInfos.put(group, topicSet); + } + zkGroupTopicBrokerInfos.put(group, topicBrokerSet); + } + } + logger.info(new StringBuilder(256) + .append("[ZkOffsetStorage] query from zookeeper, total group size = ") + .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ") + .append(zkLocalGroupTopicInfos.size()).toString()); + } + + private String normalize(final String root) { if (root.startsWith("/")) { return this.removeLastSlash(root); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java index 77d5436..da86191 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java @@ -19,6 +19,8 @@ package org.apache.tubemq.server.common.offsetstorage.zookeeper; import java.io.IOException; import java.util.ArrayList; +import java.util.List; + import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.server.common.fileconfig.ZKConfig; @@ -147,6 +149,23 @@ public class ZKUtil { return getDataInternal(zkw, znode, null, true); } + /** + * Get the children data at the specified znode. + * <p/> + * Returns the children data. Returns null if + * the node does not exist or there is an exception. + * + * @param zkw zk reference + * @param znode path of node + * @return children data of the specified znode, or null + */ + public static List<String> getChildren(ZooKeeperWatcher zkw, String znode) { + try { + return zkw.getRecoverableZooKeeper().getChildren(znode, false); + } catch (Throwable e) { + return null; + } + } private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat, boolean watcherSet) throws KeeperException { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java index 34570de..cc6f681 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java @@ -176,7 +176,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { this.consumerEventManager = new ConsumerEventManager(consumerHolder); this.topicPSInfoManager = new TopicPSInfoManager(); this.loadBalancer = new DefaultLoadBalancer(); - this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig()); + this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(), + false, TBaseConstants.META_VALUE_UNDEFINED); this.heartbeatManager = new HeartbeatManager(); heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(), new TimeoutListener() {
