This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit e3d818c2759bef5f6d84ebeb13a212b4e7493fd9 Author: gosonzhang <[email protected]> AuthorDate: Wed Jan 6 11:27:15 2021 +0800 [TUBEMQ-486]Add the delete API of consumer group offset --- .../server/broker/offset/DefaultOffsetManager.java | 104 ++++- .../tubemq/server/broker/offset/OffsetService.java | 4 + .../server/broker/web/BrokerAdminServlet.java | 430 +++++++++++++-------- .../tubemq/server/common/fielddef/WebFieldDef.java | 6 +- .../tubemq/server/common/fileconfig/ZKConfig.java | 1 - .../server/common/offsetstorage/OffsetStorage.java | 7 +- .../common/offsetstorage/ZkOffsetStorage.java | 175 ++++++--- .../common/offsetstorage/zookeeper/ZKUtil.java | 14 + .../tubemq/server/common/utils/ProcessResult.java | 3 + .../server/common/utils/WebParameterUtils.java | 278 ++++++------- .../web/handler/WebBrokerTopicConfHandler.java | 21 +- 11 files changed, 661 insertions(+), 382 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index 84dabb2..f052375 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -17,6 +17,7 @@ package org.apache.tubemq.server.broker.offset; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -339,7 +340,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse Set<String> groupSet = new HashSet<>(); groupSet.addAll(cfmOffsetMap.keySet()); Map<String, Set<String>> localGroups = - zkOffsetStorage.getZkLocalGroupTopicInfos(); + zkOffsetStorage.queryZkAllGroupTopicInfos(); groupSet.addAll(localGroups.keySet()); return groupSet; } @@ -364,7 +365,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse public Set<String> getUnusedGroupInfo() { Set<String> unUsedGroups = new HashSet<>(); Map<String, Set<String>> localGroups = - zkOffsetStorage.getZkLocalGroupTopicInfos(); + zkOffsetStorage.queryZkAllGroupTopicInfos(); for (String groupName : localGroups.keySet()) { if (!cfmOffsetMap.containsKey(groupName)) { unUsedGroups.add(groupName); @@ -383,9 +384,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse Set<String> result = new HashSet<>(); Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group); if (topicPartOffsetMap == null) { - Map<String, Set<String>> localGroups = - zkOffsetStorage.getZkLocalGroupTopicInfos(); - result = localGroups.get(group); + List<String> groupLst = new ArrayList<>(1); + groupLst.add(group); + Map<String, Set<String>> groupTopicInfo = + zkOffsetStorage.queryZKGroupTopicInfo(groupLst); + result = groupTopicInfo.get(group); } else { for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) { result.add(storageInfo.getTopic()); @@ -496,6 +499,53 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse } /*** + * Delete offset. + * + * @param onlyMemory + * @param groupTopicPartMap + * @param modifier + */ + @Override + public void deleteGroupOffset(boolean onlyMemory, + Map<String, Map<String, Set<Integer>>> groupTopicPartMap, + String modifier) { + String printBase; + StringBuilder strBuidler = new StringBuilder(512); + for (Map.Entry<String, Map<String, Set<Integer>>> entry + : groupTopicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + rmvOffset(entry.getKey(), entry.getValue()); + } + if (onlyMemory) { + printBase = strBuidler + .append("[Offset Manager] delete offset from memory by modifier=") + .append(modifier).toString(); + } else { + zkOffsetStorage.deleteGroupOffsetInfo(groupTopicPartMap); + printBase = strBuidler + .append("[Offset Manager] delete offset from memory and zk by modifier=") + .append(modifier).toString(); + } + strBuidler.delete(0, strBuidler.length()); + // print log + for (Map.Entry<String, Map<String, Set<Integer>>> entry + : groupTopicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + logger.info(strBuidler.append(printBase).append(",group=").append(entry.getKey()) + .append(",topic-partId-map=").append(entry.getValue()).toString()); + strBuidler.delete(0, strBuidler.length()); + } + } + + /*** * Set temp offset. * * @param group @@ -611,6 +661,50 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse return regInfo; } + private void rmvOffset(String group, Map<String, Set<Integer>> topicPartMap) { + if (group == null + || topicPartMap == null + || topicPartMap.isEmpty()) { + return; + } + // remove confirm offset + ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group); + if (regInfoMap != null) { + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + for (Integer partitionId : entry.getValue()) { + String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId); + regInfoMap.remove(offsetCacheKey); + } + } + if (regInfoMap.isEmpty()) { + cfmOffsetMap.remove(group); + } + } + // remove tmp offset + ConcurrentHashMap<String, Long> tmpRegInfoMap = tmpOffsetMap.get(group); + if (tmpRegInfoMap != null) { + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + for (Integer partitionId : entry.getValue()) { + String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId); + tmpRegInfoMap.remove(offsetCacheKey); + } + } + if (tmpRegInfoMap.isEmpty()) { + tmpOffsetMap.remove(group); + } + } + } + private String getOffsetCacheKey(String topic, int partitionId) { return new StringBuilder(256).append(topic) .append("-").append(partitionId).toString(); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java index 9dcd29a..4a19798 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java @@ -72,4 +72,8 @@ public interface OffsetService { boolean modifyGroupOffset(Set<String> groups, List<Tuple3<String, Integer, Long>> topicPartOffsets, String modifier); + + void deleteGroupOffset(boolean onlyMemory, + Map<String, Map<String, Set<Integer>>> groupTopicPartMap, + String modifier); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index c76a6b7..1f2fb5d 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -95,10 +95,13 @@ public class BrokerAdminServlet extends AbstractWebHandler { // set or update group's offset info innRegisterWebMethod("admin_set_offset", "adminSetGroupOffSet"); + // remove group's offset info + innRegisterWebMethod("admin_rmv_offset", + "adminRemoveGroupOffSet"); } public void adminQueryAllMethods(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { + StringBuilder sBuilder) { int index = 0; List<String> methods = getSupportedMethod(); sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); @@ -120,11 +123,11 @@ public class BrokerAdminServlet extends AbstractWebHandler { * @throws Exception */ public void adminQueryBrokerAllConsumerInfo(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { + StringBuilder sBuilder) { int index = 0; - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSGROUPNAME, false, null); - if (!result.success) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -209,10 +212,10 @@ public class BrokerAdminServlet extends AbstractWebHandler { * @throws Exception */ public void adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { + StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -276,17 +279,16 @@ public class BrokerAdminServlet extends AbstractWebHandler { * @throws Exception */ public void adminGetMemStoreStatisInfo(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { + StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } Set<String> topicNameSet = (Set<String>) result.retData1; - result = WebParameterUtils.getBooleanParamValue(req, - WebFieldDef.NEEDREFRESH, false, false); - if (!result.success) { + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.NEEDREFRESH, false, false, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -335,38 +337,34 @@ public class BrokerAdminServlet extends AbstractWebHandler { * @throws Exception */ public void adminManualSetCurrentOffSet(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.TOPICNAME, true, null); - if (!result.success) { + StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String topicName = (String) result.retData1; - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.GROUPNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.GROUPNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String groupName = (String) result.retData1; - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.MODIFYUSER, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String modifyUser = (String) result.retData1; - result = WebParameterUtils.getIntParamValue(req, - WebFieldDef.PARTITIONID, true, -1, 0); - if (!result.success) { + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } int partitionId = (Integer) result.retData1; - result = WebParameterUtils.getLongParamValue(req, - WebFieldDef.MANUALOFFSET, true, -1); - if (!result.success) { + if (!WebParameterUtils.getLongParamValue(req, + WebFieldDef.MANUALOFFSET, true, -1, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -429,23 +427,21 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminQuerySnapshotMessageSet(HttpServletRequest req, StringBuilder sBuilder) throws Exception { - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.TOPICNAME, true, null); - if (!result.success) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String topicName = (String) result.retData1; - result = WebParameterUtils.getIntParamValue(req, - WebFieldDef.PARTITIONID, true, -1, 0); - if (!result.success) { + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } int partitionId = (Integer) result.retData1; - result = WebParameterUtils.getIntParamValue(req, - WebFieldDef.MSGCOUNT, false, 3, 3); - if (!result.success) { + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.MSGCOUNT, false, 3, 3, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -457,9 +453,8 @@ public class BrokerAdminServlet extends AbstractWebHandler { .append("\"}"); return; } - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.FILTERCONDS, false, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.FILTERCONDS, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -477,31 +472,27 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminQueryCurrentGroupOffSet(HttpServletRequest req, StringBuilder sBuilder) { - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.TOPICNAME, true, null); - if (!result.success) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String topicName = (String) result.retData1; - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.GROUPNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.GROUPNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String groupName = (String) result.retData1; - result = WebParameterUtils.getIntParamValue(req, - WebFieldDef.PARTITIONID, true, -1, 0); - if (!result.success) { + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } int partitionId = (Integer) result.retData1; - - result = WebParameterUtils.getBooleanParamValue(req, - WebFieldDef.REQUIREREALOFFSET, false, false); - if (!result.success) { + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.REQUIREREALOFFSET, false, false, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -567,8 +558,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { Map<String, ConsumerNodeInfo> map = broker.getBrokerServiceServer().getConsumerRegisterMap(); int totalCnt = 0; - sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",") - .append(",\"dataSet\":["); + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) { if (entry.getKey() == null || entry.getValue() == null) { continue; @@ -592,10 +582,10 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminQueryPubInfo(HttpServletRequest req, StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); // get the topic set to be queried - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -637,9 +627,9 @@ public class BrokerAdminServlet extends AbstractWebHandler { public void adminQueryBookedGroup(HttpServletRequest req, StringBuilder sBuilder) { // get divide info - ProcessResult result = WebParameterUtils.getBooleanParamValue(req, - WebFieldDef.WITHDIVIDE, false, false); - if (!result.success) { + ProcessResult result = new ProcessResult(); + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.WITHDIVIDE, false, false, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -697,16 +687,24 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminQueryGroupOffSet(HttpServletRequest req, StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); // get group list - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSGROUPNAME, false, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } + Set<String> inGroupNameSet = (Set<String>) result.retData1; + // get the topic set to be queried + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // get target consume group name + Set<String> topicSet = (Set<String>) result.retData1; // filter invalid groups Set<String> qryGroupNameSet = new HashSet<>(); - Set<String> inGroupNameSet = (Set<String>) result.retData1; Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups(); if (inGroupNameSet.isEmpty()) { qryGroupNameSet = bookedGroupSet; @@ -717,19 +715,10 @@ public class BrokerAdminServlet extends AbstractWebHandler { } } } - // get the topic set to be queried - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { - WebParameterUtils.buildFailResult(sBuilder, result.errInfo); - return; - } - // get target consume group name - Set<String> topicSet = (Set<String>) result.retData1; // verify the acquired Topic set and // query the corresponding offset information Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = - getGroupOffsetInfo(qryGroupNameSet, topicSet); + getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, qryGroupNameSet, topicSet); // builder result int totalCnt = 0; sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); @@ -772,26 +761,24 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminSetGroupOffSet(HttpServletRequest req, StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); // get group list - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSGROUPNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } Set<String> groupNameSet = (Set<String>) result.retData1; // get set mode - result = WebParameterUtils.getBooleanParamValue(req, - WebFieldDef.MANUALSET, true, false); - if (!result.success) { + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.MANUALSET, true, false, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } boolean manualSet = (Boolean) result.retData1; // get modify user - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.MODIFYUSER, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -799,17 +786,15 @@ public class BrokerAdminServlet extends AbstractWebHandler { final String modifier = (String) result.retData1; if (manualSet) { // get offset json info - result = WebParameterUtils.getJsonDictParamValue(req, - WebFieldDef.OFFSETJSON, true, null); - if (!result.success) { + if (!WebParameterUtils.getJsonDictParamValue(req, + WebFieldDef.OFFSETJSON, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } Map<String, Long> manOffsets = (Map<String, Long>) result.retData1; // valid and transfer offset format - result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets); - if (!result.success) { + if (!validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -817,9 +802,8 @@ public class BrokerAdminServlet extends AbstractWebHandler { (List<Tuple3<String, Integer, Long>>) result.retData1; } else { // get the topic set to be set - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } @@ -840,38 +824,43 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminCloneGroupOffSet(HttpServletRequest req, StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); // get source consume group name - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.SRCGROUPNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.SRCGROUPNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } final String srcGroupName = (String) result.retData1; - // get modify user - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.MODIFYUSER, true, null); - if (!result.success) { + // get source consume group's topic set cloned to target group + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } - final String modifier = (String) result.retData1; - // get source consume group's topic set cloned to target group - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { + Set<String> srcTopicNameSet = (Set<String>) result.retData1; + // valid topic and get topic's partitionIds + if (!validAndGetTopicPartInfo(srcGroupName, + WebFieldDef.SRCGROUPNAME, srcTopicNameSet, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } + Map<String, Set<Integer>> topicPartMap = + (Map<String, Set<Integer>>) result.retData1; // get target consume group name - Set<String> srcTopicNameSet = (Set<String>) result.retData1; - result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.TGTCOMPSGROUPNAME, true, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.TGTCOMPSGROUPNAME, true, null, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return; } Set<String> tgtGroupNameSet = (Set<String>) result.retData1; + // get modify user + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + final String modifier = (String) result.retData1; // check sourceGroup if existed Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups(); if (!bookedGroups.contains(srcGroupName)) { @@ -882,16 +871,6 @@ public class BrokerAdminServlet extends AbstractWebHandler { .append(" has not been registered on this Broker!").toString()); return; } - // valid topic and get topic's partitionIds - Map<String, Set<Integer>> topicPartMap = - validAndGetPartitions(srcGroupName, srcTopicNameSet); - if (topicPartMap.isEmpty()) { - WebParameterUtils.buildFailResult(sBuilder, - new StringBuilder(512).append("Parameter ") - .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ") - .append(srcGroupName).append(" subscribed topic set!").toString()); - return; - } // query offset from source group Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets = broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap); @@ -904,6 +883,56 @@ public class BrokerAdminServlet extends AbstractWebHandler { sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } + /*** + * Remove consume group offset. + * + * @param req + * @param sBuilder process result + */ + public void adminRemoveGroupOffSet(HttpServletRequest req, + StringBuilder sBuilder) { + ProcessResult result = new ProcessResult(); + // get consume group name + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, true, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Set<String> groupNameSet = (Set<String>) result.retData1; + // get modify user + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + final String modifier = (String) result.retData1; + // get need removed offset's topic + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // get target consume group name + Set<String> topicNameSet = (Set<String>) result.retData1; + // get set mode + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.ONLYMEM, false, false, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + boolean onlyMemory = (Boolean) result.retData1; + if (!validAndGetGroupTopicInfo(groupNameSet, topicNameSet, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Map<String, Map<String, Set<Integer>>> groupTopicPartMap = + (Map<String, Map<String, Set<Integer>>>) result.retData1; + broker.getOffsetManager().deleteGroupOffset( + onlyMemory, groupTopicPartMap, modifier); + // builder return result + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); + } + // build reset offset info private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo( Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) { @@ -950,8 +979,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { List<Tuple3<String, Integer, Long>> result = new ArrayList<>(); MessageStoreManager storeManager = broker.getStoreManager(); // get topic's partition set - Map<String, Set<Integer>> topicPartMap = - validAndGetPartitions(null, topicSet); + Map<String, Set<Integer>> topicPartMap = getTopicPartitions(topicSet); // fill current topic's max offset value for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { if (entry.getKey() == null @@ -979,15 +1007,15 @@ public class BrokerAdminServlet extends AbstractWebHandler { } // build reset offset info - private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef, - Map<String, Long> manOffsetInfoMap) { + private boolean validManOffsetResetInfo(WebFieldDef fieldDef, + Map<String, Long> manOffsetInfoMap, + ProcessResult result) { String brokerId; String topicName; String strPartId; int partitionId; long adjOffset; MessageStore store = null; - ProcessResult procResult = new ProcessResult(); MessageStoreManager storeManager = broker.getStoreManager(); List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>(); String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId()); @@ -1001,12 +1029,12 @@ public class BrokerAdminServlet extends AbstractWebHandler { // parse and check partitionKey value String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP); if (keyItems.length != 3) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append("'s key invalid:") .append(entry.getKey()) .append(" must be brokerId:topicName:partitionId !").toString()); - return procResult; + return result.success; } brokerId = keyItems[0].trim(); topicName = keyItems[1].trim(); @@ -1018,12 +1046,12 @@ public class BrokerAdminServlet extends AbstractWebHandler { try { partitionId = Integer.parseInt(strPartId); } catch (NumberFormatException e) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append("'s key invalid:") .append(entry.getKey()) .append("'s partitionId value not number!").toString()); - return procResult; + return result.success; } // check and adjust offset value try { @@ -1041,65 +1069,129 @@ public class BrokerAdminServlet extends AbstractWebHandler { offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset)); } if (offsetVals.isEmpty()) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") - .append(fieldDef.name) - .append("'s value is invalid!").toString()); + .append(fieldDef.name).append("'s value is invalid!").toString()); } else { - procResult.setSuccResult(offsetVals); + result.setSuccResult(offsetVals); } - return procResult; + return result.success; } // builder group's offset info private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo( - Set<String> groupSet, Set<String> topicSet) { - long curReadDataOffset = -2; - long curDataLag = -2; + WebFieldDef groupFldDef, Set<String> groupSet, Set<String> topicSet) { + ProcessResult result = new ProcessResult(); Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>(); for (String group : groupSet) { Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>(); // valid and get topic's partitionIds - Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet); - // get topic's publish info - Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap = - broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet()); - // get group's booked offset info - Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap = - broker.getOffsetManager().queryGroupOffset(group, topicPartMap); - // get offset info array - for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { - String topic = entry.getKey(); - Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>(); - Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic); - Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic); - for (Integer partitionId : entry.getValue()) { - GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId); - offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId)); - offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId)); - String queryKey = buildQueryID(group, topic, partitionId); - ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey); - if (nodeInfo != null) { - offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset()); + if (validAndGetTopicPartInfo(group, groupFldDef, topicSet, result)) { + Map<String, Set<Integer>> topicPartMap = + (Map<String, Set<Integer>>) result.retData1; + // get topic's publish info + Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap = + broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet()); + // get group's booked offset info + Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap = + broker.getOffsetManager().queryGroupOffset(group, topicPartMap); + // get offset info array + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + String topic = entry.getKey(); + Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>(); + Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic); + Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic); + for (Integer partitionId : entry.getValue()) { + GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId); + offsetInfo.setPartPubStoreInfo( + storeInfoMap == null ? null :storeInfoMap.get(partitionId)); + offsetInfo.setConsumeOffsetInfo( + partBookedMap == null ? null : partBookedMap.get(partitionId)); + String queryKey = buildQueryID(group, topic, partitionId); + ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey); + if (nodeInfo != null) { + offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset()); + } + offsetInfo.calculateLag(); + partOffsetRet.put(partitionId, offsetInfo); } - offsetInfo.calculateLag(); - partOffsetRet.put(partitionId, offsetInfo); + topicOffsetRet.put(topic, partOffsetRet); } - topicOffsetRet.put(topic, partOffsetRet); } groupOffsetMaps.put(group, topicOffsetRet); } return groupOffsetMaps; } + // valid and get need removed group-topic info + private boolean validAndGetGroupTopicInfo(Set<String> groupSet, + Set<String> topicSet, + ProcessResult result) { + Map<String, Map<String, Set<Integer>>> groupTopicPartMap = new HashMap<>(); + // filter group + Set<String> targetGroupSet = new HashSet<>(); + Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups(); + for (String orgGroup : groupSet) { + if (bookedGroups.contains(orgGroup)) { + targetGroupSet.add(orgGroup); + } + } + // valid specified topic set + for (String group : targetGroupSet) { + if (validAndGetTopicPartInfo(group, WebFieldDef.GROUPNAME, topicSet, result)) { + Map<String, Set<Integer>> topicPartMap = + (Map<String, Set<Integer>>) result.retData1; + groupTopicPartMap.put(group, topicPartMap); + } + } + result.setSuccResult(groupTopicPartMap); + return true; + } - private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) { - Map<String, Set<Integer>> topicPartMap = new HashMap<>(); - // query stored topic set stored in memory or zk - if (topicSet.isEmpty() && group != null) { - topicSet = broker.getOffsetManager().getGroupSubInfo(group); + private boolean validAndGetTopicPartInfo(String groupName, + WebFieldDef groupFldDef, + Set<String> topicSet, + ProcessResult result) { + Set<String> subTopicSet = + broker.getOffsetManager().getGroupSubInfo(groupName); + if (subTopicSet == null || subTopicSet.isEmpty()) { + result.setFailResult(400, new StringBuilder(512) + .append("Parameter ").append(groupFldDef.name) + .append(": subscribed topic set of ").append(groupName) + .append(" query result is null!").toString()); + return result.success; } - // get topic's partitionIds + // filter valid topic set + Set<String> tgtTopicSet = new HashSet<>(); + if (topicSet.isEmpty()) { + tgtTopicSet = subTopicSet; + } else { + for (String topic : topicSet) { + if (subTopicSet.contains(topic)) { + tgtTopicSet.add(topic); + } + } + if (tgtTopicSet.isEmpty()) { + result.setFailResult(400, new StringBuilder(512) + .append("Parameter ").append(groupFldDef.name) + .append(": ").append(groupName) + .append(" unsubscribed to the specified topic set!").toString()); + return result.success; + } + } + Map<String, Set<Integer>> topicPartMap = getTopicPartitions(tgtTopicSet); + if (topicPartMap.isEmpty()) { + result.setFailResult(400, new StringBuilder(512) + .append("Parameter ").append(groupFldDef.name) + .append(": all topics subscribed by the group have been deleted!").toString()); + return result.success; + } + result.setSuccResult(topicPartMap); + return result.success; + } + + private Map<String, Set<Integer>> getTopicPartitions(Set<String> topicSet) { + Map<String, Set<Integer>> topicPartMap = new HashMap<>(); if (topicSet != null) { Map<String, TopicMetadata> topicConfigMap = broker.getMetadataManager().getTopicConfigMap(); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java index 45b862d..a65a223 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -81,8 +81,10 @@ public enum WebFieldDef { TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP), MANUALSET(20, "manualSet", "manSet", WebFieldType.BOOLEAN, "Whether manual offset setting mode"), - OFFSETJSON(21, "offsetJsonSet", "offsetSet", - WebFieldType.JSONTYPE, "The offset set that needs to be added or modified"); + OFFSETJSON(21, "offsetJsonInfo", "offsetInfo", + WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"), + ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN, + "Only clear the offset data in the memory cache, default is false"); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java index 5ac3683..8e677a5 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java @@ -28,7 +28,6 @@ public class ZKConfig { private int zkSyncTimeMs = 1000; private long zkCommitPeriodMs = 5000L; private int zkCommitFailRetries = TServerConstants.CFG_ZK_COMMIT_DEFAULT_RETRIES; - public ZKConfig() { } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java index dca7ca8..f5eee40 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java @@ -18,6 +18,7 @@ package org.apache.tubemq.server.common.offsetstorage; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; @@ -33,10 +34,12 @@ public interface OffsetStorage { final Collection<OffsetStorageInfo> offsetInfoList, boolean isFailRetry); - Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos(); + Map<String, Set<String>> queryZkAllGroupTopicInfos(); - Map<String, Set<String>> getZkLocalGroupTopicInfos(); + Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet); Map<Integer, Long> queryGroupOffsetInfo(String group, String topic, Set<Integer> partitionIds); + + void deleteGroupOffsetInfo(Map<String, Map<String, Set<Integer>>> groupTopicPartMap); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java index 8094151..3700753 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java @@ -35,6 +35,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + + /** * A offset storage implementation with zookeeper */ @@ -64,18 +66,16 @@ public class ZkOffsetStorage implements OffsetStorage { private final String consumerZkDir; private final boolean isBroker; private final int brokerId; + private final String strBrokerId; private ZKConfig zkConfig; private ZooKeeperWatcher zkw; - // group-topic-brokerid - private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>(); - // group-topic - private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>(); public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) { this.zkConfig = zkConfig; - this.brokerId = brokerId; this.isBroker = isBroker; + this.brokerId = brokerId; + this.strBrokerId = String.valueOf(brokerId); this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot()); this.consumerZkDir = this.tubeZkRoot + "/consumers-v3"; try { @@ -86,8 +86,6 @@ public class ZkOffsetStorage implements OffsetStorage { .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e); System.exit(1); } - logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper"); - queryAllZKGroupTopicInfo(); logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!"); } @@ -102,16 +100,6 @@ public class ZkOffsetStorage implements OffsetStorage { } @Override - public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() { - return zkGroupTopicBrokerInfos; - } - - @Override - public Map<String, Set<String>> getZkLocalGroupTopicInfos() { - return zkLocalGroupTopicInfos; - } - - @Override public void commitOffset(final String group, final Collection<OffsetStorageInfo> offsetInfoList, boolean isFailRetry) { @@ -242,63 +230,130 @@ public class ZkOffsetStorage implements OffsetStorage { } /** - * Get group-topic-brokerid map info stored in zookeeper. - * <p/> - * The broker only cares about the content of its own node, - * so this part only queries when the node starts, and - * caches relevant data in the memory for finding - * + * Query booked topic info of groups stored in zookeeper. + * @param groupSet query groups + * @return group--topic map info */ - private void queryAllZKGroupTopicInfo() { + @Override + public Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet) { + String qryBrokerId; + Map<String, Set<String>> groupTopicMap = new HashMap<>(); StringBuilder sBuider = new StringBuilder(512); - // get all booked groups name + if (groupSet == null || groupSet.isEmpty()) { + return groupTopicMap; + } + // build path base String groupNode = sBuider.append(this.consumerZkDir).toString(); - List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode); sBuider.delete(0, sBuider.length()); - if (bookedGroups != null) { - // get topic info by group - for (String group : bookedGroups) { - String topicNode = sBuider.append(groupNode) - .append("/").append(group).append("/offsets").toString(); - List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode); - sBuider.delete(0, sBuider.length()); - Set<String> topicSet = new HashSet<>(); - Map<String, Set<String>> topicBrokerSet = new HashMap<>(); - if (consumeTopics != null) { - // get broker info by topic - for (String topic : consumeTopics) { - String brokerNode = sBuider.append(topicNode) - .append("/").append(topic).toString(); - List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode); - sBuider.delete(0, sBuider.length()); - Set<String> brokerIdSet = new HashSet<>(); - if (brokerIds != null) { - for (String idStr : brokerIds) { - if (idStr != null) { - String[] brokerPartIdStrs = - idStr.split(TokenConstants.HYPHEN); - brokerIdSet.add(brokerPartIdStrs[0]); + // get the group managed by this broker + for (String group : groupSet) { + String topicNode = sBuider.append(groupNode) + .append("/").append(group).append("/offsets").toString(); + List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode); + sBuider.delete(0, sBuider.length()); + Set<String> topicSet = new HashSet<>(); + if (consumeTopics != null) { + for (String topic : consumeTopics) { + if (topic == null) { + continue; + } + String brokerNode = sBuider.append(topicNode) + .append("/").append(topic).toString(); + List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode); + sBuider.delete(0, sBuider.length()); + if (brokerIds != null) { + for (String idStr : brokerIds) { + if (idStr != null) { + String[] brokerPartIdStrs = + idStr.split(TokenConstants.HYPHEN); + qryBrokerId = brokerPartIdStrs[0]; + if (qryBrokerId != null + && strBrokerId.equals(qryBrokerId.trim())) { + topicSet.add(topic); + break; } } - if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) { - topicSet.add(topic); - } } - topicBrokerSet.put(topic, brokerIdSet); } } - if (!topicSet.isEmpty()) { - zkLocalGroupTopicInfos.put(group, topicSet); + } + if (!topicSet.isEmpty()) { + groupTopicMap.put(group, topicSet); + } + } + return groupTopicMap; + } + + /** + * Get group-topic map info stored in zookeeper. + * <p/> + * The broker only cares about the content of its own node + * + */ + @Override + public Map<String, Set<String>> queryZkAllGroupTopicInfos() { + StringBuilder sBuider = new StringBuilder(512); + // get all booked groups name + String groupNode = sBuider.append(this.consumerZkDir).toString(); + List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode); + return queryZKGroupTopicInfo(bookedGroups); + } + + /** + * Get offset stored in zookeeper, if not found or error, set null + * <p/> + * + * @return partitionId--offset map info + */ + @Override + public void deleteGroupOffsetInfo( + Map<String, Map<String, Set<Integer>>> groupTopicPartMap) { + StringBuilder sBuider = new StringBuilder(512); + for (Map.Entry<String, Map<String, Set<Integer>>> entry + : groupTopicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + String basePath = sBuider.append(this.consumerZkDir).append("/") + .append(entry.getKey()).append("/offsets").toString(); + sBuider.delete(0, sBuider.length()); + Map<String, Set<Integer>> topicPartMap = entry.getValue(); + for (Map.Entry<String, Set<Integer>> topicEntry : topicPartMap.entrySet()) { + if (topicEntry.getKey() == null + || topicEntry.getValue() == null + || topicEntry.getValue().isEmpty()) { + continue; + } + Set<Integer> partIdSet = topicEntry.getValue(); + for (Integer partitionId : partIdSet) { + String offsetNode = sBuider.append(basePath).append("/") + .append(topicEntry.getKey()).append("/") + .append(brokerId).append(TokenConstants.HYPHEN) + .append(partitionId).toString(); + sBuider.delete(0, sBuider.length()); + ZKUtil.delZNode(this.zkw, offsetNode); } - zkGroupTopicBrokerInfos.put(group, topicBrokerSet); + String parentNode = sBuider.append(basePath).append("/") + .append(topicEntry.getKey()).toString(); + sBuider.delete(0, sBuider.length()); + chkAndRmvBlankParentNode(parentNode); } + chkAndRmvBlankParentNode(basePath); + String parentNode = sBuider.append(this.consumerZkDir) + .append("/").append(entry.getKey()).toString(); + sBuider.delete(0, sBuider.length()); + chkAndRmvBlankParentNode(parentNode); } - logger.info(new StringBuilder(256) - .append("[ZkOffsetStorage] query from zookeeper, total group size = ") - .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ") - .append(zkLocalGroupTopicInfos.size()).toString()); } + private void chkAndRmvBlankParentNode(String parentNode) { + List<String> nodeSet = ZKUtil.getChildren(zkw, parentNode); + if (nodeSet != null && nodeSet.isEmpty()) { + ZKUtil.delZNode(this.zkw, parentNode); + } + } private String normalize(final String root) { if (root.startsWith("/")) { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java index da86191..5eb6cea 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java @@ -167,6 +167,20 @@ public class ZKUtil { } } + /** + * delete the specified znode. + * + * @param zkw zk reference + * @param znode path of node + */ + public static void delZNode(ZooKeeperWatcher zkw, String znode) { + try { + zkw.getRecoverableZooKeeper().delete(znode, -1); + } catch (Throwable e) { + // + } + } + private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat, boolean watcherSet) throws KeeperException { try { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java index 3688b1c..8cabf67 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java @@ -44,10 +44,13 @@ public class ProcessResult { this.success = false; this.errCode = errCode; this.errInfo = errMsg; + this.retData1 = null; } public void setSuccResult(Object retData) { this.success = true; + this.errInfo = ""; + this.errCode = TErrCodeConstants.SUCCESS; this.retData1 = retData; } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index fddd5de..2318bdb 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -249,32 +249,32 @@ public class WebParameterUtils { * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object - * @return valid result for the parameter value + * @param result process result of parameter value + * @return process result */ - public static ProcessResult getLongParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required, - long defValue) { - ProcessResult procResult = - getStringParamValue(req, fieldDef, required, null); - if (!procResult.success) { - return procResult; - } - String paramValue = (String) procResult.retData1; + public static boolean getLongParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + long defValue, + ProcessResult result) { + if (!getStringParamValue(req, fieldDef, required, null, result)) { + return result.success; + } + String paramValue = (String) result.retData1; if (paramValue == null) { - procResult.setSuccResult(defValue); - return procResult; + result.setSuccResult(defValue); + return result.success; } try { long paramIntVal = Long.parseLong(paramValue); - procResult.setSuccResult(paramIntVal); + result.setSuccResult(paramIntVal); } catch (Throwable e) { - procResult.setFailResult(400, + result.setFailResult(400, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append(" parse error: ") .append(e.getMessage()).toString()); } - return procResult; + return result.success; } /** @@ -283,43 +283,44 @@ public class WebParameterUtils { * @param req Http Servlet Request * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required - * @return valid result for the parameter value + * @param result process result of parameter value + * @return process result */ - public static ProcessResult getIntParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required) { - ProcessResult procResult = - getStringParamValue(req, fieldDef, required, null); - if (!procResult.success) { - return procResult; + public static boolean getIntParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + ProcessResult result) { + if (!getStringParamValue(req, fieldDef, + required, null, result)) { + return result.success; } - ProcessResult procRet = new ProcessResult(); Set<Integer> tgtValueSet = new HashSet<Integer>(); if (fieldDef.isCompFieldType()) { - Set<String> valItemSet = (Set<String>) procResult.retData1; + Set<String> valItemSet = (Set<String>) result.retData1; if (valItemSet.isEmpty()) { - procResult.setSuccResult(tgtValueSet); - return procResult; + result.setSuccResult(tgtValueSet); + return result.success; } for (String itemVal : valItemSet) { - if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) { - return procRet; + if (!checkIntValueNorms(fieldDef, + itemVal, false, -1, result)) { + return result.success; } - tgtValueSet.add((Integer) procRet.retData1); + tgtValueSet.add((Integer) result.retData1); } } else { - String paramValue = (String) procResult.retData1; + String paramValue = (String) result.retData1; if (paramValue == null) { - procResult.setSuccResult(tgtValueSet); - return procResult; + result.setSuccResult(tgtValueSet); + return result.success; } - if (!checkIntValueNorms(procRet, - fieldDef, paramValue, false, -1)) { - tgtValueSet.add((Integer) procRet.retData1); + if (!checkIntValueNorms(fieldDef, + paramValue, false, -1, result)) { + tgtValueSet.add((Integer) result.retData1); } } - procResult.setSuccResult(tgtValueSet); - return procResult; + result.setSuccResult(tgtValueSet); + return result.success; } /** @@ -330,43 +331,44 @@ public class WebParameterUtils { * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object * @param minValue min value required - * @return valid result for the parameter value + * @param result process result of parameter value + * @return process result */ - public static ProcessResult getIntParamValue(HttpServletRequest req, + public static boolean getIntParamValue(HttpServletRequest req, WebFieldDef fieldDef, boolean required, int defValue, - int minValue) { - ProcessResult procResult = - getStringParamValue(req, fieldDef, required, null); - if (!procResult.success) { - return procResult; + int minValue, + ProcessResult result) { + if (!getStringParamValue(req, fieldDef, required, null, result)) { + return result.success; } if (fieldDef.isCompFieldType()) { Set<Integer> tgtValueSet = new HashSet<Integer>(); - Set<String> valItemSet = (Set<String>) procResult.retData1; + Set<String> valItemSet = (Set<String>) result.retData1; if (valItemSet.isEmpty()) { tgtValueSet.add(defValue); - procResult.setSuccResult(tgtValueSet); - return procResult; + result.setSuccResult(tgtValueSet); + return result.success; } - ProcessResult procRet = new ProcessResult(); for (String itemVal : valItemSet) { - if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) { - return procRet; + if (!checkIntValueNorms(fieldDef, + itemVal, true, minValue, result)) { + return result.success; } - tgtValueSet.add((Integer) procRet.retData1); + tgtValueSet.add((Integer) result.retData1); } - procResult.setSuccResult(tgtValueSet); + result.setSuccResult(tgtValueSet); } else { - String paramValue = (String) procResult.retData1; + String paramValue = (String) result.retData1; if (paramValue == null) { - procResult.setSuccResult(defValue); - return procResult; + result.setSuccResult(defValue); + return result.success; } - checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue); + checkIntValueNorms(fieldDef, + paramValue, true, minValue, result); } - return procResult; + return result.success; } /** @@ -376,24 +378,24 @@ public class WebParameterUtils { * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object + * @param result process result * @return valid result for the parameter value */ - public static ProcessResult getBooleanParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required, - boolean defValue) { - ProcessResult procResult = - getStringParamValue(req, fieldDef, required, null); - if (!procResult.success) { - return procResult; - } - String paramValue = (String) procResult.retData1; + public static boolean getBooleanParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + boolean defValue, + ProcessResult result) { + if (!getStringParamValue(req, fieldDef, required, null, result)) { + return result.success; + } + String paramValue = (String) result.retData1; if (paramValue == null) { - procResult.setSuccResult(defValue); - return procResult; + result.setSuccResult(defValue); + return result.success; } - procResult.setSuccResult(Boolean.parseBoolean(paramValue)); - return procResult; + result.setSuccResult(Boolean.parseBoolean(paramValue)); + return result.success; } /** @@ -403,13 +405,14 @@ public class WebParameterUtils { * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object + * @param result process result * @return valid result for the parameter value */ - public static ProcessResult getStringParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required, - String defValue) { - ProcessResult procResult = new ProcessResult(); + public static boolean getStringParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + String defValue, + ProcessResult result) { // get parameter value String paramValue = req.getParameter(fieldDef.name); if (paramValue == null) { @@ -422,14 +425,14 @@ public class WebParameterUtils { // Check if the parameter exists if (TStringUtils.isBlank(paramValue)) { if (required) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append(" is missing or value is null or blank!").toString()); } else { - procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue); + procStringDefValue(fieldDef.isCompFieldType(), defValue, result); } - return procResult; + return result.success; } // check if value is norm; if (fieldDef.isCompFieldType()) { @@ -440,41 +443,41 @@ public class WebParameterUtils { if (TStringUtils.isBlank(strParamValueItem)) { continue; } - if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) { - return procResult; + if (!checkStrValueNorms(fieldDef, strParamValueItem, result)) { + return result.success; } - valItemSet.add((String) procResult.retData1); + valItemSet.add((String) result.retData1); } // check if is empty result if (valItemSet.isEmpty()) { if (required) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append(" is missing or value is null or blank!").toString()); } else { - procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue); + procStringDefValue(fieldDef.isCompFieldType(), defValue, result); } - return procResult; + return result.success; } // check max item count if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) { if (valItemSet.size() > fieldDef.itemMaxCnt) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append("'s item count over max allowed count (") .append(fieldDef.itemMaxCnt).append(")!").toString()); } } - procResult.setSuccResult(valItemSet); + result.setSuccResult(valItemSet); } else { - if (!checkStrValueNorms(procResult, fieldDef, paramValue)) { - return procResult; + if (!checkStrValueNorms(fieldDef, paramValue, result)) { + return result.success; } - procResult.setSuccResult(paramValue); + result.setSuccResult(paramValue); } - return procResult; + return result.success; } /** @@ -484,13 +487,14 @@ public class WebParameterUtils { * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object + * @param result process result * @return valid result for the parameter value */ - public static ProcessResult getJsonDictParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required, - Map<String, Long> defValue) { - ProcessResult procResult = new ProcessResult(); + public static boolean getJsonDictParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + Map<String, Long> defValue, + ProcessResult result) { // get parameter value String paramValue = req.getParameter(fieldDef.name); if (paramValue == null) { @@ -503,20 +507,20 @@ public class WebParameterUtils { // Check if the parameter exists if (TStringUtils.isBlank(paramValue)) { if (required) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append(" is missing or value is null or blank!").toString()); } else { - procResult.setSuccResult(defValue); + result.setSuccResult(defValue); } - return procResult; + return result.success; } try { paramValue = URLDecoder.decode(paramValue, TBaseConstants.META_DEFAULT_CHARSET_NAME); } catch (UnsupportedEncodingException e) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append(" decode error, exception is ") @@ -524,73 +528,83 @@ public class WebParameterUtils { } if (TStringUtils.isBlank(paramValue)) { if (required) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append("'s value is blank!").toString()); } else { - procResult.setSuccResult(defValue); + result.setSuccResult(defValue); } - return procResult; + return result.success; } if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) { if (paramValue.length() > fieldDef.valMaxLen) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("Parameter ") .append(fieldDef.name) .append("'s length over max allowed length (") .append(fieldDef.valMaxLen).append(")!").toString()); - return procResult; + return result.success; } } - procResult.setSuccResult(new Gson().fromJson(paramValue, - new TypeToken<Map<String, Long>>(){}.getType())); - return procResult; + // parse data + try { + Map<String, Long> manOffsets = new Gson().fromJson(paramValue, + new TypeToken<Map<String, Long>>(){}.getType()); + result.setSuccResult(manOffsets); + } catch (Throwable e) { + result.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append(" value parse failure, error is ") + .append(e.getMessage()).append("!").toString()); + } + return result.success; } /** * process string default value * - * @param procResult process result * @param isCompFieldType the parameter if compound field type * @param defValue the parameter default value + * @param result process result * @return process result for default value of parameter */ - private static ProcessResult procStringDefValue(ProcessResult procResult, - boolean isCompFieldType, - String defValue) { + private static boolean procStringDefValue(boolean isCompFieldType, + String defValue, + ProcessResult result) { if (isCompFieldType) { Set<String> valItemSet = new HashSet<>(); if (TStringUtils.isNotBlank(defValue)) { valItemSet.add(defValue); } - procResult.setSuccResult(valItemSet); + result.setSuccResult(valItemSet); } else { - procResult.setSuccResult(defValue); + result.setSuccResult(defValue); } - return procResult; + return result.success; } /** * Parse the parameter string value by regex define * - * @param procResult process result * @param fieldDef the parameter field definition * @param paramVal the parameter value + * @param result process result * @return check result for string value of parameter */ - private static boolean checkStrValueNorms(ProcessResult procResult, - WebFieldDef fieldDef, - String paramVal) { + private static boolean checkStrValueNorms(WebFieldDef fieldDef, + String paramVal, + ProcessResult result) { paramVal = paramVal.trim(); if (TStringUtils.isBlank(paramVal)) { - procResult.setSuccResult(null); + result.setSuccResult(null); return true; } // check value's max length if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) { if (paramVal.length() > fieldDef.valMaxLen) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("over max length for ") .append(fieldDef.name).append(", only allow ") .append(fieldDef.valMaxLen).append(" length").toString()); @@ -600,44 +614,44 @@ public class WebParameterUtils { // check value's pattern if (fieldDef.regexCheck) { if (!paramVal.matches(fieldDef.regexDef.getPattern())) { - procResult.setFailResult(fieldDef.id, + result.setFailResult(fieldDef.id, new StringBuilder(512).append("illegal value for ") .append(fieldDef.name).append(", value ") .append(fieldDef.regexDef.getErrMsgTemp()).toString()); return false; } } - procResult.setSuccResult(paramVal); + result.setSuccResult(paramVal); return true; } /** * Parse the parameter string value by regex define * - * @param procResult process result * @param fieldDef the parameter field definition * @param paramValue the parameter value * @param hasMinVal whether there is a minimum * param minValue the parameter min value + * @param result process result * @return check result for string value of parameter */ - private static boolean checkIntValueNorms(ProcessResult procResult, - WebFieldDef fieldDef, + private static boolean checkIntValueNorms(WebFieldDef fieldDef, String paramValue, boolean hasMinVal, - int minValue) { + int minValue, + ProcessResult result) { try { int paramIntVal = Integer.parseInt(paramValue); if (hasMinVal && paramIntVal < minValue) { - procResult.setFailResult(400, + result.setFailResult(400, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append(" value must >= ") .append(minValue).toString()); return false; } - procResult.setSuccResult(paramIntVal); + result.setSuccResult(paramIntVal); } catch (Throwable e) { - procResult.setFailResult(400, + result.setFailResult(400, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append(" parse error: ") .append(e.getMessage()).toString()); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java index 3bdfe16..8bef5ab 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java @@ -207,7 +207,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB", req.getParameter("memCacheMsgSizeInMB"), false, defmemCacheMsgSizeInMB, 2); - memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB; + memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048); int memCacheFlushIntvl = WebParameterUtils.validIntDataParameter("memCacheFlushIntvl", req.getParameter("memCacheFlushIntvl"), @@ -354,7 +354,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB", jsonObject.get("memCacheMsgSizeInMB"), false, brokerConfEntity.getDftMemCacheMsgSizeInMB(), 2); - memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB; + memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048); int memCacheFlushIntvl = WebParameterUtils.validIntDataParameter("memCacheFlushIntvl", jsonObject.get("memCacheFlushIntvl"), @@ -714,10 +714,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { * @return */ public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) { + ProcessResult result = new ProcessResult(); StringBuilder strBuffer = new StringBuilder(512); - ProcessResult result = WebParameterUtils.getIntParamValue(req, - WebFieldDef.COMPSBROKERID, false); - if (!result.success) { + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.COMPSBROKERID, false, result)) { WebParameterUtils.buildFailResult(strBuffer, result.errInfo); return strBuffer; } @@ -752,17 +752,16 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { * @return */ public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) { + ProcessResult result = new ProcessResult(); StringBuilder strBuffer = new StringBuilder(512); - ProcessResult result = WebParameterUtils.getStringParamValue(req, - WebFieldDef.COMPSTOPICNAME, false, null); - if (!result.success) { + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { WebParameterUtils.buildFailResult(strBuffer, result.errInfo); return strBuffer; } Set<String> topicNameSet = (Set<String>) result.retData1; - result = WebParameterUtils.getBooleanParamValue(req, - WebFieldDef.WITHIP, false, false); - if (!result.success) { + if (!WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.WITHIP, false, false, result)) { WebParameterUtils.buildFailResult(strBuffer, result.errInfo); return strBuffer; }
