This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 59aea8948 [INLONG-4942][TubeMQ] Add the display of the IP address of
consumer (#4944)
59aea8948 is described below
commit 59aea8948ad76f4d6d87f24f571c343a80585c7c
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Jul 9 12:16:12 2022 +0800
[INLONG-4942][TubeMQ] Add the display of the IP address of consumer (#4944)
---
.../tubemq/server/broker/BrokerServiceServer.java | 18 ++++++++++------
.../broker/msgstore/MessageStoreManager.java | 2 +-
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 25 +++++++++++++++-------
.../server/broker/web/BrokerAdminServlet.java | 7 +++---
.../inlong/tubemq/server/master/TMaster.java | 8 ++++---
.../nodemanage/nodeconsumer/ConsumerInfo.java | 14 ++++++++++--
.../master/web/handler/WebOtherInfoHandler.java | 3 ++-
7 files changed, 52 insertions(+), 25 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 892f6c1c3..a0361398d 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -868,7 +868,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
partLock = brokerRowLock.getLock(null,
StringUtils.getBytesUtf8(partStr), true);
if (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER) {
return inProcessConsumerRegister(clientId, groupName,
- topicName, partStr, filterCondSet, overtls,
request, builder, strBuffer);
+ topicName, partStr, filterCondSet, rmtAddress,
+ overtls, request, builder, strBuffer);
} else if (request.getOpType() ==
RpcConstants.MSG_OPTYPE_UNREGISTER) {
return inProcessConsumerUnregister(clientId, groupName,
topicName, partStr, request, overtls, builder,
strBuffer);
@@ -906,16 +907,17 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
* @param topicName the topic name
* @param partStr the group-topic-partitionId key
* @param filterCondSet the filter condition set
+ * @param msgRcvFrom the address message received
* @param overtls whether transfer over TLS
* @param request the request
* @param builder the response builder
* @param strBuffer the string buffer
* @return the response
*/
- private RegisterResponseB2C inProcessConsumerRegister(final String
clientId, final String groupName,
- final String
topicName, final String partStr,
- final Set<String>
filterCondSet, boolean overtls,
- RegisterRequestC2B
request,
+ private RegisterResponseB2C inProcessConsumerRegister(String clientId,
String groupName,
+ String topicName,
String partStr,
+ Set<String>
filterCondSet, String msgRcvFrom,
+ boolean overtls,
RegisterRequestC2B request,
RegisterResponseB2C.Builder builder,
StringBuilder
strBuffer) {
String consumerId = null;
@@ -929,8 +931,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
String reqSessionKey = request.hasSessionKey() ?
request.getSessionKey() : null;
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() :
TBaseConstants.META_VALUE_UNDEFINED;
- consumerNodeInfo = new ConsumerNodeInfo(storeManager,
reqQryPriorityId,
- clientId, filterCondSet, reqSessionKey, reqSessionTime,
true, partStr);
+ consumerNodeInfo =
+ new ConsumerNodeInfo(storeManager, reqQryPriorityId,
clientId,
+ filterCondSet, reqSessionKey, reqSessionTime,
+ true, partStr, msgRcvFrom);
if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
BrokerSrvStatsHolder.incConsumerOnlineCnt();
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6f10324a8..1539c9294 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -367,7 +367,7 @@ public class MessageStoreManager implements StoreService {
final long maxOffset = msgStore.getIndexMaxOffset();
ConsumerNodeInfo consumerNodeInfo =
new ConsumerNodeInfo(tubeBroker.getStoreManager(),
- "visit", filterCondSet, "",
System.currentTimeMillis(), "");
+ "visit", filterCondSet, "",
System.currentTimeMillis(), "", "");
int maxIndexReadSize = (msgCount + 1)
* DataStoreUtils.STORE_INDEX_HEAD_LEN *
msgStore.getPartitionNum();
if (filterCondSet != null && !filterCondSet.isEmpty()) {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 487b95ec6..2071ac474 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -45,6 +45,7 @@ public class ConsumerNodeInfo {
// filter conditions in int format
private final Set<Integer> filterCondCode = new HashSet<>(10);
// consumer's address
+ private String addrRcvFrom;
private String rmtAddrInfo;
private boolean isSupportLimit = false;
private long nextStatTime = 0L;
@@ -70,12 +71,14 @@ public class ConsumerNodeInfo {
* @param sessionKey the session key
* @param sessionTime the session create time
* @param partStr the partition information
+ * @param msgRcvFrom the address message received
*/
- public ConsumerNodeInfo(final MessageStoreManager storeManager,
- final String consumerId, Set<String> filterCodes,
- final String sessionKey, long sessionTime, final
String partStr) {
+ public ConsumerNodeInfo(MessageStoreManager storeManager,
+ String consumerId, Set<String> filterCodes,
+ String sessionKey, long sessionTime, String
partStr,
+ String msgRcvFrom) {
this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
- filterCodes, sessionKey, sessionTime, false, partStr);
+ filterCodes, sessionKey, sessionTime, false, partStr,
msgRcvFrom);
}
/**
@@ -89,12 +92,13 @@ public class ConsumerNodeInfo {
* @param sessionTime the session create time
* @param isSupportLimit whether to support limited consumption
function
* @param partStr the partition information
+ * @param msgRcvFrom the address message received
*/
- public ConsumerNodeInfo(final MessageStoreManager storeManager,
- final int qryPriorityId, final String consumerId,
- Set<String> filterCodes, final String sessionKey,
+ public ConsumerNodeInfo(MessageStoreManager storeManager,
+ int qryPriorityId, String consumerId,
+ Set<String> filterCodes, String sessionKey,
long sessionTime, boolean isSupportLimit,
- final String partStr) {
+ String partStr, String msgRcvFrom) {
setConsumerId(consumerId);
if (filterCodes != null) {
for (String filterItem : filterCodes) {
@@ -107,6 +111,7 @@ public class ConsumerNodeInfo {
this.qryPriorityId.set(qryPriorityId);
this.storeManager = storeManager;
this.partStr = partStr;
+ this.addrRcvFrom = msgRcvFrom;
this.createTime = System.currentTimeMillis();
if (filterCodes != null && !filterCodes.isEmpty()) {
this.isFilterConsume = true;
@@ -245,6 +250,10 @@ public class ConsumerNodeInfo {
return sessionTime;
}
+ public String getAddrRcvFrom() {
+ return addrRcvFrom;
+ }
+
public void setLastProcInfo(long lastGetTime, long lastRdDataOffset, int
totalMsgSize) {
this.lastGetTime = lastGetTime;
this.lastDataRdOffset = lastRdDataOffset;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index ab00b5bca..1f4b92cd6 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -210,7 +210,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
.append(regTime).append(",\"isFilterConsume\":")
.append(ifFilterConsume);
}
-
strBuff.append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
+
strBuff.append(",\"receivedFrom\":\"").append(entry.getValue().getAddrRcvFrom())
+
.append("\",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
.append(",\"curDataLimitInM\":").append(entry.getValue().getCurFlowCtrlLimitSize())
.append(",\"curFreqLimit\":").append(entry.getValue().getCurFlowCtrlFreqLimit())
.append(",\"totalSentSec\":").append(entry.getValue().getSentMsgSize())
@@ -777,7 +778,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// get the maximum query turns
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.MAXRETRYCOUNT, false,
- 2, 1, 5, sBuffer, result)) {
+ 2, 1, 30, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return;
}
@@ -853,7 +854,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
filterCodes.add(groupName);
// build consumer node information
ConsumerNodeInfo consumerNodeInfo = new
ConsumerNodeInfo(broker.getStoreManager(),
- "offsetConsumer", filterCodes, "", System.currentTimeMillis(),
"");
+ "offsetConsumer", filterCodes, "", System.currentTimeMillis(),
"", "");
// query records from storage
int qryRetryCount = 0;
long itemInitOffset = requestOffset;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index edef4c269..2d38c5c18 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -516,7 +516,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
*/
@Override
public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request,
- final String rmtAddress,
+ String rmtAddress,
boolean overtls) throws
Exception {
// #lizard forgives
ProcessResult result = new ProcessResult();
@@ -592,7 +592,8 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
ConsumerInfo inConsumerInfo =
new ConsumerInfo(consumerId, overtls, groupName,
reqTopicSet, reqTopicConditions, csmType,
- sessionKey, sessionTime, sourceCount, isSelectBig,
requiredPartMap);
+ sessionKey, sessionTime, sourceCount,
+ isSelectBig, requiredPartMap, rmtAddress);
paramCheckResult =
PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
masterConfig, defMetaDataService, brokerRunManager,
strBuffer);
@@ -1269,7 +1270,8 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
ConsumerInfo inConsumerInfo =
new ConsumerInfo(consumerId, overtls, groupName, csmType,
sourceCount, nodeId, reqTopicSet, reqTopicConditions,
- opsTaskInfo.getCsmFromMaxOffsetCtrlId(),
clientSyncInfo);
+ opsTaskInfo.getCsmFromMaxOffsetCtrlId(),
clientSyncInfo,
+ rmtAddress);
// need removed for authorize center begin
if (!this.defMetaDataService
.isConsumeTargetAuthorized(consumerId, groupName,
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
index 4828e98d5..2a9eb18ba 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -36,6 +36,7 @@ public class ConsumerInfo implements
Comparable<ConsumerInfo>, Serializable {
private final String group;
private final Set<String> topicSet;
private final Map<String, TreeSet<String>> topicConditions;
+ private String addrRcvFrom;
private boolean overTLS = false;
private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
@@ -64,15 +65,17 @@ public class ConsumerInfo implements
Comparable<ConsumerInfo>, Serializable {
* @param sourceCount the minimum consumer count of consume group
* @param selectedBig whether to choose a larger value if there is
a conflict
* @param requiredPartition the required partitions
+ * @param msgRcvFrom the address received message
*/
public ConsumerInfo(String consumerId, boolean overTLS, String group,
Set<String> topicSet, Map<String, TreeSet<String>>
topicConditions,
ConsumeType consumeType, String sessionKey, long
startTime,
int sourceCount, boolean selectedBig,
- Map<String, Long> requiredPartition) {
+ Map<String, Long> requiredPartition, String
msgRcvFrom) {
this.group = group;
this.consumeType = consumeType;
this.consumerId = consumerId;
+ this.addrRcvFrom = msgRcvFrom;
this.overTLS = overTLS;
this.topicSet = topicSet;
if (topicConditions == null) {
@@ -101,14 +104,16 @@ public class ConsumerInfo implements
Comparable<ConsumerInfo>, Serializable {
* @param topicConditions the topic filter condition set
* @param curCsmCtrlId the node's consume control id
* @param syncInfo the consumer report information
+ * @param msgRcvFrom the address received message
*/
public ConsumerInfo(String consumerId, boolean overTLS, String group,
ConsumeType consumeType, int sourceCount, int nodeId,
Set<String> topicSet, Map<String, TreeSet<String>>
topicConditions,
- long curCsmCtrlId, ClientSyncInfo syncInfo) {
+ long curCsmCtrlId, ClientSyncInfo syncInfo, String
msgRcvFrom) {
this.group = group;
this.consumeType = consumeType;
this.consumerId = consumerId;
+ this.addrRcvFrom = msgRcvFrom;
this.overTLS = overTLS;
this.topicSet = topicSet;
if (topicConditions == null) {
@@ -132,6 +137,7 @@ public class ConsumerInfo implements
Comparable<ConsumerInfo>, Serializable {
return;
}
this.overTLS = inCsmInfo.overTLS;
+ this.addrRcvFrom = inCsmInfo.addrRcvFrom;
this.nodeId = inCsmInfo.getNodeId();
updClientReportInfo(inCsmInfo.getCsmFromMaxOffsetCtrlId(),
inCsmInfo.getLstAssignedTime(),
inCsmInfo.getUsedTopicMetaInfoId());
@@ -211,6 +217,10 @@ public class ConsumerInfo implements
Comparable<ConsumerInfo>, Serializable {
return topicConditions;
}
+ public String getAddrRcvFrom() {
+ return addrRcvFrom;
+ }
+
public boolean isOverTLS() {
return overTLS;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 0d02675af..4418fa68e 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -496,7 +496,8 @@ public class WebOtherInfoHandler extends AbstractWebHandler
{
strBuffer.append(",");
}
strBuffer.append("{\"consumerId\":\"").append(consumer.getConsumerId())
-
.append("\"").append(",\"isOverTLS\":").append(consumer.isOverTLS());
+
.append("\",\"receivedFrom\":\"").append(consumer.getAddrRcvFrom())
+
.append("\",\"isOverTLS\":").append(consumer.isOverTLS());
if (consumeType == ConsumeType.CONSUME_BAND) {
Map<String, Long> requiredPartition =
consumer.getRequiredPartition();
if (requiredPartition == null ||
requiredPartition.isEmpty()) {