This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cab7dcb [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
cab7dcb is described below
commit cab7dcb168ef992d6e8546b324e13211b2438c95
Author: gosonzhang <[email protected]>
AuthorDate: Thu Feb 17 16:57:04 2022 +0800
[INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
---
.../server/broker/web/BrokerAdminServlet.java | 22 ++-
.../tubemq/server/common/TServerConstants.java | 2 +-
.../master/web/handler/WebOtherInfoHandler.java | 176 +++++++++++++++++++++
3 files changed, 186 insertions(+), 14 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index bcdaa5b..0057eb6 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1101,7 +1101,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
broker.getOffsetManager().deleteGroupOffset(
onlyMemory, groupTopicPartMap, modifier);
// builder return result
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ WebParameterUtils.buildSuccessResult(sBuffer);
}
/**
@@ -1120,10 +1120,12 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
return;
}
final boolean needRefresh = (Boolean) result.getRetData();
-
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"probeTime\":\"")
+ // build return result
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"probeTime\":\"")
.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
.append("\",\"nodeName\":\"").append(broker.getTubeConfig().getHostName())
-
.append("\",\"role\":\"Broker\",\"metrics\":{\"serviceStatus\":");
+
.append("\",\"nodeRole\":\"Broker\",\"metrics\":{\"serviceStatus\":");
if (needRefresh) {
BrokerSrvStatsHolder.snapShort(sBuffer);
sBuffer.append(",\"webAPI\":");
@@ -1134,6 +1136,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
WebCallStatsHolder.getValue(sBuffer);
}
sBuffer.append("},\"count\":2}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
}
/**
@@ -1160,7 +1163,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
// query data
int index = 0;
int recordId = 0;
-
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"dataSet\":[");
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
Map<String, ConcurrentHashMap<Integer, MessageStore>>
messageTopicStores =
broker.getStoreManager().getMessageStores();
if (topicNameSet.isEmpty()) {
@@ -1222,7 +1225,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
sBuffer.append("]}");
}
}
- sBuffer.append("],\"totalCount\":").append(recordId).append("}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, recordId);
}
/**
@@ -1269,13 +1272,6 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
*/
public void adminDisableAllStats(HttpServletRequest req,
StringBuilder sBuffer) {
- ProcessResult result = new ProcessResult();
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
- return;
- }
- String statsType = (String) result.getRetData();
innEnableOrDisableMetricsStats(false,
BrokerStatsType.ALL.getName(), req, sBuffer);
}
@@ -1365,7 +1361,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
}
}
// builder return result
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ WebParameterUtils.buildSuccessResult(sBuffer);
}
// build reset offset info
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index 8f49962..a336ae8 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -100,6 +100,6 @@ public final class TServerConstants {
DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
// Minimum snapshot period
- public static final long MIN_SNAPSHOT_PERIOD_MS = 5000L;
+ public static final long MIN_SNAPSHOT_PERIOD_MS = 2000L;
public static final int META_MAX_STATSTYPE_LENGTH = 256;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 10791a0..0cc1fc6 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -30,8 +30,12 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
+import org.apache.inlong.tubemq.server.common.TubeServerVersion;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
import org.apache.inlong.tubemq.server.master.TMaster;
import
org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import
org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
@@ -39,6 +43,8 @@ import
org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeTyp
import
org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import
org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import
org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
+import org.apache.inlong.tubemq.server.master.stats.MasterSrvStatsHolder;
+import org.apache.inlong.tubemq.server.master.stats.MasterStatsType;
public class WebOtherInfoHandler extends AbstractWebHandler {
@@ -58,6 +64,21 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
"getSubscribeInfo");
registerQueryWebMethod("admin_query_consume_group_detail",
"getConsumeGroupDetailInfo");
+ // query master's version
+ registerQueryWebMethod("admin_query_server_version",
+ "adminQueryMasterVersion");
+ // register query method
+ registerQueryWebMethod("admin_get_metrics_info",
+ "adminGetMetricsInfo");
+ // Enable metrics statistics
+ registerModifyWebMethod("admin_enable_stats",
+ "adminEnableMetricsStats");
+ // Disable metrics statistics
+ registerModifyWebMethod("admin_disable_stats",
+ "adminDisableMetricsStats");
+ // Disable unnecessary statistics
+ registerModifyWebMethod("admin_disable_all_stats",
+ "adminDisableAllStats");
}
/**
@@ -261,6 +282,161 @@ public class WebOtherInfoHandler extends
AbstractWebHandler {
}
/**
+ * Query Master's version
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminQueryBrokerVersion(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"version\":\"")
+ .append(TubeServerVersion.SERVER_VERSION).append("\"}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+ return sBuffer;
+ }
+
+ /**
+ * Get master's metric information
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminGetMetricsInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get whether to reset the metric items
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.NEEDREFRESH, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final boolean needRefresh = (Boolean) result.getRetData();
+ // query current metric values;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"probeTime\":\"")
+
.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
+
.append("\",\"nodeName\":\"").append(master.getMasterConfig().getHostName())
+
.append("\",\"nodeRole\":\"Master\",\"metrics\":{\"serviceStatus\":");
+ if (needRefresh) {
+ MasterSrvStatsHolder.snapShort(sBuffer);
+ sBuffer.append(",\"webAPI\":");
+ WebCallStatsHolder.snapShort(sBuffer);
+ } else {
+ MasterSrvStatsHolder.getValue(sBuffer);
+ sBuffer.append(",\"webAPI\":");
+ WebCallStatsHolder.getValue(sBuffer);
+ }
+ sBuffer.append("},\"count\":2}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+ return sBuffer;
+ }
+
+ /**
+ * Enable Master's statistics functions.
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminEnableMetricsStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ String statsType = (String) result.getRetData();
+ return innEnableOrDisableMetricsStats(true, statsType, req, sBuffer,
result);
+ }
+
+ /**
+ * Disable Master's statistics functions.
+ *
+ * @param req request
+ * @param sBuffer process result
+ */
+ public StringBuilder adminDisableMetricsStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ String statsType = (String) result.getRetData();
+ innEnableOrDisableMetricsStats(true, statsType, req, sBuffer, result);
+ return sBuffer;
+ }
+
+ /**
+ * Disable Master's all statistics functions.
+ *
+ * @param req request
+ * @param sBuffer process result
+ */
+ public StringBuilder adminDisableAllStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ innEnableOrDisableMetricsStats(false,
+ BrokerStatsType.ALL.getName(), req, sBuffer, result);
+ return sBuffer;
+ }
+
+ /**
+ * Disable or Enable Master's statistics functions
+ *
+ * @param enable whether enable or disable
+ * @param statsType the statistics type to be operated on
+ * @param req HttpServletRequest
+ * @param sBuffer query result
+ * @param result process result
+ * @return return information
+ */
+ private StringBuilder innEnableOrDisableMetricsStats(boolean enable,
+ String statsType,
+ HttpServletRequest
req,
+ StringBuilder sBuffer,
+ ProcessResult result)
{
+ // get input metric type
+ MasterStatsType inMetricType = null;
+ for (MasterStatsType metricType : MasterStatsType.values()) {
+ if (metricType.getName().equalsIgnoreCase(statsType)) {
+ inMetricType = metricType;
+ break;
+ }
+ }
+ if (inMetricType == null) {
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":")
+ .append("\"Unmatched stat type, allowed stat type are :
[");
+ int count = 0;
+ for (MasterStatsType metricType : MasterStatsType.values()) {
+ if (count++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append(metricType.getDesc());
+ }
+ sBuffer.append("]\"}");
+ return sBuffer;
+ }
+ // Operate separately according to the specified statistic type
+ if (inMetricType == MasterStatsType.WEBAPI
+ || inMetricType == MasterStatsType.ALL) {
+ WebCallStatsHolder.setStatsStatus(enable);
+ }
+ // builder return result
+ WebParameterUtils.buildSuccessResult(sBuffer);
+ return sBuffer;
+ }
+
+ /**
* Private method to append consumer info of the give list to a string
builder
*
* @param consumerList consumer list