This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 1b854fb413d1226572343627cf52a8e40d0410f4 Author: gosonzhang <[email protected]> AuthorDate: Mon Jan 4 19:39:55 2021 +0800 [TUBEMQ-485]Add the batch setting API of consume group offset --- .../org/apache/tubemq/corebase/utils/Tuple3.java | 48 +++++ .../server/broker/offset/DefaultOffsetManager.java | 83 +++----- .../tubemq/server/broker/offset/OffsetService.java | 7 +- .../server/broker/web/BrokerAdminServlet.java | 228 ++++++++++++++++++++- .../tubemq/server/common/fielddef/WebFieldDef.java | 8 +- .../server/common/utils/WebParameterUtils.java | 71 +++++++ .../tubemq/server/common/webbase/WebFieldType.java | 3 +- 7 files changed, 388 insertions(+), 60 deletions(-) diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java new file mode 100644 index 0000000..a2d98c3 --- /dev/null +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.corebase.utils; + +public class Tuple3<T0, T1, T2> { + + /** Field 0 of the tuple. */ + public T0 f0 = null; + /** Field 1 of the tuple. */ + public T1 f1 = null; + /** Field 2 of the tuple. */ + public T2 f2 = null; + + /** + * Creates a new tuple where all fields are null. + */ + public Tuple3() { + + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param value0 The value for field 0 + * @param value1 The value for field 1 + * @param value2 The value for field 2 + */ + public Tuple3(T0 value0, T1 value1, T2 value2) { + this.f0 = value0; + this.f1 = value1; + this.f2 = value2; + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index df3afc4..84dabb2 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -19,6 +19,7 @@ package org.apache.tubemq.server.broker.offset; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,9 +27,9 @@ import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.daemon.AbstractDaemonService; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.Tuple2; +import org.apache.tubemq.corebase.utils.Tuple3; import org.apache.tubemq.server.broker.BrokerConfig; import org.apache.tubemq.server.broker.msgstore.MessageStore; -import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.apache.tubemq.server.common.offsetstorage.OffsetStorage; import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo; @@ -119,8 +120,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse || (readStatus == TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS)) { long adjOffset = indexMaxOffset; if (readStatus != TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS) { - adjOffset = reqOffset > indexMaxOffset ? indexMaxOffset : reqOffset; - adjOffset = adjOffset < indexMinOffset ? indexMinOffset : adjOffset; + adjOffset = Math.min(reqOffset, indexMaxOffset); + adjOffset = Math.max(adjOffset, indexMinOffset); } regInfo.getAndSetOffset(adjOffset); } @@ -288,7 +289,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse long firstOffset = store.getIndexMinOffset(); long lastOffset = store.getIndexMaxOffset(); reSetOffset = reSetOffset < firstOffset - ? firstOffset : reSetOffset > lastOffset ? lastOffset : reSetOffset; + ? firstOffset : Math.min(reSetOffset, lastOffset); String offsetCacheKey = getOffsetCacheKey(topic, partitionId); getAndResetTmpOffset(group, offsetCacheKey); OffsetStorageInfo regInfo = @@ -449,70 +450,46 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse return result; } - /*** * Reset offset. * - * @param storeManager * @param groups - * @param topicPartOffsetMap + * @param topicPartOffsets * @param modifier * @return at least one record modified */ @Override - public boolean modifyGroupOffset( - MessageStoreManager storeManager, Set<String> groups, - Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) { + public boolean modifyGroupOffset(Set<String> groups, + List<Tuple3<String, Integer, Long>> topicPartOffsets, + String modifier) { long oldOffset = -1; - long reSetOffset = -1; boolean changed = false; - MessageStore store = null; + String offsetCacheKey = null; StringBuilder strBuidler = new StringBuilder(512); // set offset by group for (String group : groups) { - for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry - : topicPartOffsetMap.entrySet()) { - Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue(); - if (partOffsetMap == null) { + for (Tuple3<String, Integer, Long> tuple3 : topicPartOffsets) { + if (tuple3 == null + || tuple3.f0 == null + || tuple3.f1 == null + || tuple3.f2 == null) { continue; } - // set offset - for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) { - if (entry1.getValue() == null) { - continue; - } - Tuple2<Long, Long> offsetTuple = entry1.getValue(); - // get topic store - try { - store = storeManager.getOrCreateMessageStore( - entry.getKey(), entry1.getKey()); - } catch (Throwable e) { - // - } - if (store == null) { - continue; - } - long firstOffset = store.getIndexMinOffset(); - long lastOffset = store.getIndexMaxOffset(); - // adjust reseted offset value - reSetOffset = offsetTuple.f0 < firstOffset - ? firstOffset : Math.min(offsetTuple.f0, lastOffset); - String offsetCacheKey = - getOffsetCacheKey(entry.getKey(), entry1.getKey()); - getAndResetTmpOffset(group, offsetCacheKey); - OffsetStorageInfo regInfo = loadOrCreateOffset(group, - entry.getKey(), entry1.getKey(), offsetCacheKey, 0); - oldOffset = regInfo.getAndSetOffset(reSetOffset); - changed = true; - logger.info(strBuidler - .append("[Offset Manager] Update offset by modifier=") - .append(modifier).append(",reset offset=").append(reSetOffset) - .append(",old offset=").append(oldOffset) - .append(",updated offset=").append(regInfo.getOffset()) - .append(",group=").append(group) - .append(",topic-partId=").append(offsetCacheKey).toString()); - strBuidler.delete(0, strBuidler.length()); - } + // set offset value + offsetCacheKey = getOffsetCacheKey(tuple3.f0, tuple3.f1); + getAndResetTmpOffset(group, offsetCacheKey); + OffsetStorageInfo regInfo = loadOrCreateOffset(group, + tuple3.f0, tuple3.f1, offsetCacheKey, 0); + oldOffset = regInfo.getAndSetOffset(tuple3.f2); + changed = true; + logger.info(strBuidler + .append("[Offset Manager] Update offset by modifier=") + .append(modifier).append(",reset offset=").append(tuple3.f2) + .append(",old offset=").append(oldOffset) + .append(",updated offset=").append(regInfo.getOffset()) + .append(",group=").append(group) + .append(",topic-partId=").append(offsetCacheKey).toString()); + strBuidler.delete(0, strBuidler.length()); } } return changed; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java index fcebdfc..9dcd29a 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java @@ -17,12 +17,13 @@ package org.apache.tubemq.server.broker.offset; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.tubemq.corebase.utils.Tuple2; +import org.apache.tubemq.corebase.utils.Tuple3; import org.apache.tubemq.server.broker.msgstore.MessageStore; -import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo; @@ -68,7 +69,7 @@ public interface OffsetService { Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset( String group, Map<String, Set<Integer>> topicPartMap); - boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups, - Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, + boolean modifyGroupOffset(Set<String> groups, + List<Tuple3<String, Integer, Long>> topicPartOffsets, String modifier); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index d8f85d4..c76a6b7 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -17,6 +17,7 @@ package org.apache.tubemq.server.broker.web; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -25,9 +26,11 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.servlet.http.HttpServletRequest; + import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.Tuple2; +import org.apache.tubemq.corebase.utils.Tuple3; import org.apache.tubemq.server.broker.TubeBroker; import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.MessageStore; @@ -89,6 +92,9 @@ public class BrokerAdminServlet extends AbstractWebHandler { // clone consumer group's offset from source to target innRegisterWebMethod("admin_clone_offset", "adminCloneGroupOffSet"); + // set or update group's offset info + innRegisterWebMethod("admin_set_offset", + "adminSetGroupOffSet"); } public void adminQueryAllMethods(HttpServletRequest req, @@ -759,6 +765,74 @@ public class BrokerAdminServlet extends AbstractWebHandler { } /*** + * Add or Modify consumer group offset. + * + * @param req + * @param sBuilder process result + */ + public void adminSetGroupOffSet(HttpServletRequest req, + StringBuilder sBuilder) { + // get group list + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Set<String> groupNameSet = (Set<String>) result.retData1; + // get set mode + result = WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.MANUALSET, true, false); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + boolean manualSet = (Boolean) result.retData1; + // get modify user + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + List<Tuple3<String, Integer, Long>> resetOffsets; + final String modifier = (String) result.retData1; + if (manualSet) { + // get offset json info + result = WebParameterUtils.getJsonDictParamValue(req, + WebFieldDef.OFFSETJSON, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Map<String, Long> manOffsets = + (Map<String, Long>) result.retData1; + // valid and transfer offset format + result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + resetOffsets = + (List<Tuple3<String, Integer, Long>>) result.retData1; + } else { + // get the topic set to be set + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, true, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + Set<String> topicSet = (Set<String>) result.retData1; + // transfer offset format + resetOffsets = buildOffsetResetInfo(topicSet); + } + boolean changed = broker.getOffsetManager().modifyGroupOffset( + groupNameSet, resetOffsets, modifier); + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); + } + + /*** * Clone consume group offset, clone A group's offset to other group. * * @param req @@ -821,12 +895,162 @@ public class BrokerAdminServlet extends AbstractWebHandler { // query offset from source group Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets = broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap); + // transfer offset format + List<Tuple3<String, Integer, Long>> resetOffsets = + buildOffsetResetInfo(srcGroupOffsets); boolean changed = broker.getOffsetManager().modifyGroupOffset( - broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier); + tgtGroupNameSet, resetOffsets, modifier); // builder return result sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } + // build reset offset info + private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo( + Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) { + long adjOffset = -1; + MessageStore store = null; + List<Tuple3<String, Integer, Long>> result = new ArrayList<>(); + MessageStoreManager storeManager = broker.getStoreManager(); + for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry + : topicPartOffsetMap.entrySet()) { + Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue(); + if (partOffsetMap == null) { + continue; + } + // process offset value + for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) { + if (entry1.getValue() == null) { + continue; + } + Tuple2<Long, Long> offsetTuple = entry1.getValue(); + // get topic store + try { + store = storeManager.getOrCreateMessageStore( + entry.getKey(), entry1.getKey()); + } catch (Throwable e) { + // + } + if (store == null) { + continue; + } + long firstOffset = store.getIndexMinOffset(); + long lastOffset = store.getIndexMaxOffset(); + // adjust reset offset value + adjOffset = offsetTuple.f0 < firstOffset + ? firstOffset : Math.min(offsetTuple.f0, lastOffset); + result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset)); + } + } + return result; + } + + // build reset offset info + private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(Set<String> topicSet) { + MessageStore store = null; + List<Tuple3<String, Integer, Long>> result = new ArrayList<>(); + MessageStoreManager storeManager = broker.getStoreManager(); + // get topic's partition set + Map<String, Set<Integer>> topicPartMap = + validAndGetPartitions(null, topicSet); + // fill current topic's max offset value + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + if (entry.getKey() == null + || entry.getValue() == null + || entry.getValue().isEmpty()) { + continue; + } + Set<Integer> partitionSet = entry.getValue(); + for (Integer partId : partitionSet) { + // get topic store + try { + store = storeManager.getOrCreateMessageStore( + entry.getKey(), partId); + } catch (Throwable e) { + // + } + if (store == null) { + continue; + } + result.add(new Tuple3<>(entry.getKey(), + partId, store.getIndexMaxOffset())); + } + } + return result; + } + + // build reset offset info + private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef, + Map<String, Long> manOffsetInfoMap) { + String brokerId; + String topicName; + String strPartId; + int partitionId; + long adjOffset; + MessageStore store = null; + ProcessResult procResult = new ProcessResult(); + MessageStoreManager storeManager = broker.getStoreManager(); + List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>(); + String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId()); + // get topic configure infos + Map<String, TopicMetadata> topicConfigMap = + broker.getMetadataManager().getTopicConfigMap(); + for (Map.Entry<String, Long> entry : manOffsetInfoMap.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + // parse and check partitionKey value + String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP); + if (keyItems.length != 3) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name).append("'s key invalid:") + .append(entry.getKey()) + .append(" must be brokerId:topicName:partitionId !").toString()); + return procResult; + } + brokerId = keyItems[0].trim(); + topicName = keyItems[1].trim(); + strPartId = keyItems[2].trim(); + if (!localBrokerId.equals(brokerId) + || !topicConfigMap.containsKey(topicName)) { + continue; + } + try { + partitionId = Integer.parseInt(strPartId); + } catch (NumberFormatException e) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name).append("'s key invalid:") + .append(entry.getKey()) + .append("'s partitionId value not number!").toString()); + return procResult; + } + // check and adjust offset value + try { + store = storeManager.getOrCreateMessageStore(topicName, partitionId); + } catch (Throwable e) { + // + } + if (store == null) { + continue; + } + long firstOffset = store.getIndexMinOffset(); + long lastOffset = store.getIndexMaxOffset(); + adjOffset = entry.getValue() < firstOffset + ? firstOffset : Math.min(entry.getValue(), lastOffset); + offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset)); + } + if (offsetVals.isEmpty()) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append("'s value is invalid!").toString()); + } else { + procResult.setSuccResult(offsetVals); + } + return procResult; + } + // builder group's offset info private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo( Set<String> groupSet, Set<String> topicSet) { @@ -872,7 +1096,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) { Map<String, Set<Integer>> topicPartMap = new HashMap<>(); // query stored topic set stored in memory or zk - if (topicSet.isEmpty()) { + if (topicSet.isEmpty() && group != null) { topicSet = broker.getOffsetManager().getGroupSubInfo(group); } // get topic's partitionIds diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java index ec97421..45b862d 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -78,7 +78,13 @@ public enum WebFieldDef { RegexDef.TMP_GROUP), TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup", WebFieldType.COMPSTRING, "Offset clone target group name", - TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP); + TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP), + MANUALSET(20, "manualSet", "manSet", + WebFieldType.BOOLEAN, "Whether manual offset setting mode"), + OFFSETJSON(21, "offsetJsonSet", "offsetSet", + WebFieldType.JSONTYPE, "The offset set that needs to be added or modified"); + + diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index 1202d33..fddd5de 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -478,6 +478,77 @@ public class WebParameterUtils { } /** + * Parse the parameter value from an json dict + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if failed to parse value from the given object + * @return valid result for the parameter value + */ + public static ProcessResult getJsonDictParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + Map<String, Long> defValue) { + ProcessResult procResult = new ProcessResult(); + // get parameter value + String paramValue = req.getParameter(fieldDef.name); + if (paramValue == null) { + paramValue = req.getParameter(fieldDef.shortName); + } + if (TStringUtils.isNotBlank(paramValue)) { + // Cleanup value extra characters + paramValue = escDoubleQuotes(paramValue.trim()); + } + // Check if the parameter exists + if (TStringUtils.isBlank(paramValue)) { + if (required) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append(" is missing or value is null or blank!").toString()); + } else { + procResult.setSuccResult(defValue); + } + return procResult; + } + try { + paramValue = URLDecoder.decode(paramValue, + TBaseConstants.META_DEFAULT_CHARSET_NAME); + } catch (UnsupportedEncodingException e) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append(" decode error, exception is ") + .append(e.toString()).toString()); + } + if (TStringUtils.isBlank(paramValue)) { + if (required) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append("'s value is blank!").toString()); + } else { + procResult.setSuccResult(defValue); + } + return procResult; + } + if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) { + if (paramValue.length() > fieldDef.valMaxLen) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append("'s length over max allowed length (") + .append(fieldDef.valMaxLen).append(")!").toString()); + return procResult; + } + } + procResult.setSuccResult(new Gson().fromJson(paramValue, + new TypeToken<Map<String, Long>>(){}.getType())); + return procResult; + } + + /** * process string default value * * @param procResult process result diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java index b83a966..2f32cb1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java @@ -28,7 +28,8 @@ public enum WebFieldType { BOOLEAN(4, "Boolean"), DATE(5, "Date"), COMPSTRING(6, "Compound string"), - COMPINT(7, "Compound integer"); + COMPINT(7, "Compound integer"), + JSONTYPE(8, "Json"); public int value;
