This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 31ad65f34 [INLONG-7184][TubeMQ] Replace CertifiedResult with
ProcessResult (#7185)
31ad65f34 is described below
commit 31ad65f3474f0fd5846eb409b1af7f86b6ac737e
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Jan 8 20:31:23 2023 +0800
[INLONG-7184][TubeMQ] Replace CertifiedResult with ProcessResult (#7185)
---
.../tubemq/server/broker/BrokerServiceServer.java | 73 ++++----
.../common/aaaserver/CertificateBrokerHandler.java | 13 +-
.../common/aaaserver/CertificateMasterHandler.java | 15 +-
.../{CertifiedResult.java => CertifiedInfo.java} | 42 ++---
.../aaaserver/SimpleCertificateBrokerHandler.java | 61 +++----
.../aaaserver/SimpleCertificateMasterHandler.java | 103 ++++++-----
.../inlong/tubemq/server/master/TMaster.java | 188 +++++++++------------
7 files changed, 223 insertions(+), 272 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 03c661370..94a901804 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -72,7 +72,7 @@ import
org.apache.inlong.tubemq.server.broker.stats.audit.AuditUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.TStatusConstants;
import
org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
-import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult;
+import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedInfo;
import org.apache.inlong.tubemq.server.common.exception.HeartbeatException;
import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo;
@@ -607,13 +607,12 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg("Write StoreService temporary unavailable!");
return builder.build();
}
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
// get and check clientId field
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer,
result)) {
builder.setErrCode(result.getErrCode());
@@ -659,12 +658,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
.append(checkSum).toString());
return builder.build();
}
- CertifiedResult authorizeResult =
- serverAuthHandler.validProduceAuthorizeInfo(
- certResult.userName, topicName, msgType, rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if (!serverAuthHandler.validProduceAuthorizeInfo(
+ certifiedInfo.getUserName(), topicName, msgType, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
try {
@@ -682,7 +679,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
AuditUtils.addProduceRecord(topicName,
request.getMsgType(), request.getMsgTime(), 1,
dataLength);
builder.setSuccess(true);
- builder.setRequireAuth(certResult.reAuth);
+ builder.setRequireAuth(certifiedInfo.isReAuth());
builder.setErrCode(TErrCodeConstants.SUCCESS);
// begin Deprecated, after 1.0, the ErrMsg set "Ok" or ""
builder.setErrMsg(String.valueOf(appendResult.getMsgId()));
@@ -811,21 +808,21 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
public RegisterResponseB2C consumerRegisterC2B(RegisterRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ ProcessResult result = new ProcessResult();
RegisterResponseB2C.Builder builder = RegisterResponseB2C.newBuilder();
builder.setSuccess(false);
builder.setCurrOffset(-1);
- CertifiedResult certResult =
serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
if (!this.started.get()) {
builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
builder.setErrMsg("StoreService temporary unavailable!");
return builder.build();
}
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- ProcessResult result = new ProcessResult();
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
final StringBuilder strBuffer = new StringBuilder(512);
// get and check clientId field
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer,
result)) {
@@ -859,12 +856,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
}
}
}
- CertifiedResult authorizeResult =
-
serverAuthHandler.validConsumeAuthorizeInfo(certResult.userName,
- groupName, topicName, filterCondSet, isRegister,
rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumeAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, topicName, filterCondSet, isRegister, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
Integer lid = null;
@@ -1098,13 +1093,12 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg("StoreService temporary unavailable!");
return builder.build();
}
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
// get and check clientId field
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer,
result)) {
builder.setErrCode(result.getErrCode());
@@ -1124,7 +1118,6 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
: TBaseConstants.META_VALUE_UNDEFINED;
List<Partition> partitions =
DataConverterUtil.convertPartitionInfo(request.getPartitionInfoList());
- CertifiedResult authorizeResult = null;
boolean isAuthorized = false;
List<String> failureInfo = new ArrayList<>();
for (Partition partition : partitions) {
@@ -1135,7 +1128,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
if (consumerNodeInfo == null) {
failureInfo.add(strBuffer.append(TErrCodeConstants.HB_NO_NODE)
.append(TokenConstants.ATTR_SEP)
- .append(partition.toString()).toString());
+ .append(partition).toString());
strBuffer.delete(0, strBuffer.length());
logger.warn(strBuffer.append("[Heartbeat Check] UnRegistered
Consumer:")
.append(clientId).append(TokenConstants.SEGMENT_SEP)
@@ -1145,7 +1138,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
}
if (!clientId.equals(consumerNodeInfo.getConsumerId())) {
failureInfo.add(strBuffer.append(TErrCodeConstants.DUPLICATE_PARTITION)
-
.append(TokenConstants.ATTR_SEP).append(partition.toString()).toString());
+
.append(TokenConstants.ATTR_SEP).append(partition).toString());
strBuffer.delete(0, strBuffer.length());
strBuffer.append("[Heartbeat Check] Duplicated partition:
Partition ").append(partStr)
.append(" has been consumed by
").append(consumerNodeInfo.getConsumerId())
@@ -1155,13 +1148,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
continue;
}
if (!isAuthorized) {
- authorizeResult =
-
serverAuthHandler.validConsumeAuthorizeInfo(certResult.userName,
- groupName, topic,
consumerNodeInfo.getFilterCondStrs(), true, rmtAddress);
- if (!authorizeResult.result) {
- builder.setRequireAuth(authorizeResult.reAuth);
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumeAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, topic,
consumerNodeInfo.getFilterCondStrs(), true, rmtAddress, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
isAuthorized = true;
@@ -1171,8 +1161,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
getHeartbeatNodeId(clientId, partStr));
} catch (HeartbeatException e) {
failureInfo.add(strBuffer.append(TErrCodeConstants.HB_NO_NODE)
- .append(TokenConstants.ATTR_SEP)
- .append(partition.toString()).toString());
+
.append(TokenConstants.ATTR_SEP).append(partition).toString());
strBuffer.delete(0, strBuffer.length());
logger.warn(strBuffer.append("[Heartbeat Check] Invalid
Request")
.append(clientId).append(TokenConstants.SEGMENT_SEP)
@@ -1184,7 +1173,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
consumerNodeInfo.setQryPriorityId(reqQryPriorityId);
}
}
- builder.setRequireAuth(certResult.reAuth);
+ builder.setRequireAuth(certifiedInfo.isReAuth());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setHasPartFailure(false);
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java
index 3c8a922e6..50a820d08 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java
@@ -20,6 +20,7 @@ package org.apache.inlong.tubemq.server.common.aaaserver;
import java.util.Set;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
public interface CertificateBrokerHandler {
@@ -27,15 +28,15 @@ public interface CertificateBrokerHandler {
void appendVisitToken(ClientMaster.MasterBrokerAuthorizedInfo
authorizedInfo);
- CertifiedResult identityValidUserInfo(ClientBroker.AuthorizedInfo
authorizedInfo,
- boolean isProduce);
+ boolean identityValidUserInfo(ClientBroker.AuthorizedInfo authorizedInfo,
+ boolean isProduce, ProcessResult result);
- CertifiedResult validProduceAuthorizeInfo(String userName, String
topicName,
- String msgType, String clientIp);
+ boolean validProduceAuthorizeInfo(String userName, String topicName,
+ String msgType, String clientIp, ProcessResult result);
- CertifiedResult validConsumeAuthorizeInfo(String userName, String
groupName,
+ boolean validConsumeAuthorizeInfo(String userName, String groupName,
String topicName, Set<String> msgTypeLst,
- boolean isRegister, String clientIp);
+ boolean isRegister, String clientIp, ProcessResult result);
boolean isEnableProduceAuthenticate();
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java
index ad497a4ce..eaf6401e8 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java
@@ -21,17 +21,20 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
public interface CertificateMasterHandler {
- CertifiedResult identityValidBrokerInfo(ClientMaster.MasterCertificateInfo
authenticInfo);
+ boolean identityValidBrokerInfo(
+ ClientMaster.MasterCertificateInfo authenticInfo, ProcessResult
result);
- CertifiedResult identityValidUserInfo(ClientMaster.MasterCertificateInfo
authenticInfo,
- boolean isProduce);
+ boolean identityValidUserInfo(ClientMaster.MasterCertificateInfo
authenticInfo,
+ boolean isProduce, ProcessResult result);
- CertifiedResult validProducerAuthorizeInfo(String userName, Set<String>
topics, String clientIp);
+ boolean validProducerAuthorizeInfo(String userName,
+ Set<String> topics, String clientIp, ProcessResult result);
- CertifiedResult validConsumerAuthorizeInfo(String userName, String
groupName, Set<String> topics,
- Map<String, TreeSet<String>> topicConds, String clientIp);
+ boolean validConsumerAuthorizeInfo(String userName, String groupName,
Set<String> topics,
+ Map<String, TreeSet<String>> topicConds, String clientIp,
ProcessResult result);
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java
similarity index 53%
rename from
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java
rename to
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java
index ebab5442a..25040f768 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java
@@ -17,42 +17,36 @@
package org.apache.inlong.tubemq.server.common.aaaserver;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+public class CertifiedInfo {
-public class CertifiedResult {
+ private String userName = "";
+ private String authorizedToken = "";
+ private boolean reAuth = false;
- public boolean result = false;
- public int errCode = TErrCodeConstants.BAD_REQUEST;
- public String errInfo = "Not authenticate!";
- public String authorizedToken = "";
- public boolean reAuth = false;
- public String userName = "";
+ public CertifiedInfo() {
- public CertifiedResult() {
-
- }
-
- public void setFailureResult(int errCode, final String resultInfo) {
- this.result = false;
- this.errCode = errCode;
- this.errInfo = resultInfo;
}
- public void setSuccessResult(final String userName, final String
authorizedToken) {
- this.result = true;
- this.errCode = TErrCodeConstants.SUCCESS;
- this.errInfo = "Ok!";
+ public CertifiedInfo(String userName, String authorizedToken) {
this.userName = userName;
this.authorizedToken = authorizedToken;
}
- public void setSuccessResult(final String userName, final String
authorizedToken, boolean reAuth) {
- this.result = true;
- this.errCode = TErrCodeConstants.SUCCESS;
- this.errInfo = "Ok!";
+ public void setSuccessResult(String userName, String authorizedToken,
boolean reAuth) {
this.userName = userName;
this.authorizedToken = authorizedToken;
this.reAuth = reAuth;
}
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getAuthorizedToken() {
+ return authorizedToken;
+ }
+
+ public boolean isReAuth() {
+ return reAuth;
+ }
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
index cebd578de..f310765fd 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
@@ -25,6 +25,7 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.broker.TubeBroker;
import org.slf4j.Logger;
@@ -79,12 +80,12 @@ public class SimpleCertificateBrokerHandler implements
CertificateBrokerHandler
}
lastUpdatedVisitTokens = curBrokerVisitTokens;
String[] visitTokenItems =
curBrokerVisitTokens.split(TokenConstants.ARRAY_SEP);
- for (int i = 0; i < visitTokenItems.length; i++) {
- if (TStringUtils.isBlank(visitTokenItems[i])) {
+ for (String visitTokenItem : visitTokenItems) {
+ if (TStringUtils.isBlank(visitTokenItem)) {
continue;
}
try {
- long curVisitToken = Long.parseLong(visitTokenItems[i].trim());
+ long curVisitToken = Long.parseLong(visitTokenItem.trim());
List<Long> currList = visitTokenList.get();
if (!currList.contains(curVisitToken)) {
while (true) {
@@ -109,13 +110,12 @@ public class SimpleCertificateBrokerHandler implements
CertificateBrokerHandler
}
@Override
- public CertifiedResult identityValidUserInfo(final
ClientBroker.AuthorizedInfo authorizedInfo,
- boolean isProduce) {
- CertifiedResult result = new CertifiedResult();
+ public boolean identityValidUserInfo(ClientBroker.AuthorizedInfo
authorizedInfo,
+ boolean isProduce, ProcessResult result) {
if (authorizedInfo == null) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Authorized Info is required!");
- return result;
+ return result.isSuccess();
}
if (enableVisitTokenCheck) {
long curVisitToken = authorizedInfo.getVisitAuthorizedToken();
@@ -123,58 +123,53 @@ public class SimpleCertificateBrokerHandler implements
CertificateBrokerHandler
if (tubeBroker.isKeepAlive()) {
if (!currList.contains(curVisitToken)
&& (System.currentTimeMillis() -
tubeBroker.getLastRegTime() > inValidTokenCheckTimeMs)) {
-
result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Visit Authorized Token is invalid!");
- return result;
+ return result.isSuccess();
}
}
}
if ((isProduce && !enableProduceAuthenticate)
|| (!isProduce && !enableConsumeAuthenticate)) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
if (TStringUtils.isBlank(authorizedInfo.getAuthAuthorizedToken())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"authAuthorizedToken is Blank!");
- return result;
+ return result.isSuccess();
}
// process authAuthorizedToken info from certificate center begin
// process authAuthorizedToken info from certificate center end
// set userName, reAuth info
- result.setSuccessResult("", "", false);
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
@Override
- public CertifiedResult validConsumeAuthorizeInfo(final String userName,
final String groupName,
- final String topicName, final Set<String> msgTypeLst,
- boolean isRegister, String clientIp) {
- CertifiedResult result = new CertifiedResult();
+ public boolean validConsumeAuthorizeInfo(String userName, String
groupName, String topicName,
+ Set<String> msgTypeLst, boolean isRegister, String clientIp,
ProcessResult result) {
if (!enableConsumeAuthorize) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
-
// process authorize from authorize center begin
// process authorize from authorize center end
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
@Override
- public CertifiedResult validProduceAuthorizeInfo(final String userName,
final String topicName,
- final String msgType, String clientIp) {
- CertifiedResult result = new CertifiedResult();
+ public boolean validProduceAuthorizeInfo(String userName, String topicName,
+ String msgType, String clientIp, ProcessResult result) {
if (!enableProduceAuthorize) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
-
// process authorize from authorize center begin
// process authorize from authorize center end
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
@Override
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java
index da715adcc..1f8b56bfd 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.master.MasterConfig;
@@ -34,140 +35,138 @@ public class SimpleCertificateMasterHandler implements
CertificateMasterHandler
}
@Override
- public CertifiedResult identityValidBrokerInfo(
- final ClientMaster.MasterCertificateInfo certificateInfo) {
- CertifiedResult result = new CertifiedResult();
+ public boolean identityValidBrokerInfo(
+ ClientMaster.MasterCertificateInfo certificateInfo, ProcessResult
result) {
if (!masterConfig.isNeedBrokerVisitAuth()) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
if (certificateInfo == null) {
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
ClientMaster.AuthenticateInfo authInfo = certificateInfo.getAuthInfo();
if (authInfo == null) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: AuthenticateInfo is null!");
- return result;
+ return result.isSuccess();
}
if (TStringUtils.isBlank(authInfo.getUserName())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: authInfo.userName is Blank!");
- return result;
+ return result.isSuccess();
}
String inUserName = authInfo.getUserName().trim();
if (TStringUtils.isBlank(authInfo.getSignature())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: authInfo.signature is Blank!");
- return result;
+ return result.isSuccess();
}
String inSignature = authInfo.getSignature().trim();
if (!inUserName.equals(masterConfig.getVisitName())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: userName is not equal in
authenticateToken!");
- return result;
+ return result.isSuccess();
}
if (Math.abs(System.currentTimeMillis() - authInfo.getTimestamp()) >
masterConfig
.getAuthValidTimeStampPeriodMs()) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: timestamp out of effective period in
authenticateToken!");
- return result;
+ return result.isSuccess();
}
String signature =
TStringUtils.getAuthSignature(inUserName,
masterConfig.getVisitPassword(),
authInfo.getTimestamp(), authInfo.getNonce());
if (!inSignature.equals(signature)) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: userName or password is not correct!");
- return result;
+ return result.isSuccess();
}
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
@Override
- public CertifiedResult identityValidUserInfo(final
ClientMaster.MasterCertificateInfo certificateInfo,
- boolean isProduce) {
+ public boolean identityValidUserInfo(ClientMaster.MasterCertificateInfo
certificateInfo,
+ boolean isProduce, ProcessResult result) {
String inUserName = "";
String authorizedToken = "";
String othParams = "";
- CertifiedResult result = new CertifiedResult();
if (isProduce) {
if (!masterConfig.isStartProduceAuthenticate()) {
- result.setSuccessResult(inUserName, authorizedToken);
- return result;
+ result.setSuccResult(new CertifiedInfo(inUserName,
authorizedToken));
+ return result.isSuccess();
}
} else {
if (!masterConfig.isStartConsumeAuthenticate()) {
- result.setSuccessResult(inUserName, authorizedToken);
- return result;
+ result.setSuccResult(new CertifiedInfo(inUserName,
authorizedToken));
+ return result.isSuccess();
}
}
if (certificateInfo == null) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Server required MasterCertificateInfo!");
- return result;
+ return result.isSuccess();
}
ClientMaster.AuthenticateInfo authInfo = certificateInfo.getAuthInfo();
if (authInfo == null) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: AuthenticateInfo is null!");
- return result;
+ return result.isSuccess();
}
if (TStringUtils.isBlank(authInfo.getUserName())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: authInfo.userName is Blank!");
- return result;
+ return result.isSuccess();
}
inUserName = authInfo.getUserName().trim();
if (TStringUtils.isNotBlank(authInfo.getOthParams())) {
othParams = authInfo.getOthParams().trim();
}
if (TStringUtils.isBlank(authInfo.getSignature())) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: authInfo.signature is Blank!");
- return result;
+ return result.isSuccess();
}
String inSignature = authInfo.getSignature().trim();
if (Math.abs(System.currentTimeMillis() - authInfo.getTimestamp()) >
masterConfig
.getAuthValidTimeStampPeriodMs()) {
- result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE,
+ result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE,
"Illegal value: timestamp out of effective period in
authenticateToken!");
- return result;
+ return result.isSuccess();
}
// get username and password from certificate center begin
// get username and password from certificate center end
// get identified userName and authorized token info and return
- result.setSuccessResult(inUserName, authorizedToken);
- return result;
+ result.setSuccResult(new CertifiedInfo(inUserName, authorizedToken));
+ return result.isSuccess();
}
@Override
- public CertifiedResult validProducerAuthorizeInfo(String userName,
Set<String> topics, String clientIp) {
- CertifiedResult result = new CertifiedResult();
+ public boolean validProducerAuthorizeInfo(String userName, Set<String>
topics,
+ String clientIp, ProcessResult result) {
if (!masterConfig.isStartProduceAuthorize()) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
// call authorize center begin
// call authorize center end
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
@Override
- public CertifiedResult validConsumerAuthorizeInfo(String userName, String
groupName, Set<String> topics,
- Map<String, TreeSet<String>> topicConds, String clientIp) {
- CertifiedResult result = new CertifiedResult();
+ public boolean validConsumerAuthorizeInfo(String userName, String
groupName, Set<String> topics,
+ Map<String, TreeSet<String>> topicConds, String clientIp,
ProcessResult result) {
if (!masterConfig.isStartProduceAuthorize()) {
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
// call authorize center begin
// call authorize center end
- result.setSuccessResult("", "");
- return result;
+ result.setSuccResult(new CertifiedInfo());
+ return result.isSuccess();
}
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 2e1a67075..9cb7ced13 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -90,7 +90,7 @@ import
org.apache.inlong.tubemq.corerpc.exception.StandbyException;
import org.apache.inlong.tubemq.corerpc.service.MasterService;
import org.apache.inlong.tubemq.server.Stoppable;
import
org.apache.inlong.tubemq.server.common.aaaserver.CertificateMasterHandler;
-import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult;
+import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedInfo;
import
org.apache.inlong.tubemq.server.common.aaaserver.SimpleCertificateMasterHandler;
import org.apache.inlong.tubemq.server.common.exception.HeartbeatException;
import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager;
@@ -314,13 +314,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
RegisterResponseM2P.Builder builder = RegisterResponseM2P.newBuilder();
builder.setSuccess(false);
builder.setBrokerCheckSum(-1);
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -345,12 +344,10 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
checkNodeStatus(producerId, strBuff);
- CertifiedResult authorizeResult =
- serverAuthHandler.validProducerAuthorizeInfo(
- certResult.userName, transTopicSet, rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if (!serverAuthHandler.validProducerAuthorizeInfo(
+ certifiedInfo.getUserName(), transTopicSet, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientJdkVer = request.hasJdkVersion() ?
request.getJdkVersion() : "";
@@ -365,7 +362,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
builder.setAuthorizedInfo(genAuthorizedInfo(
- certResult.authorizedToken, false).build());
+ certifiedInfo.getAuthorizedToken(), false).build());
ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
buildApprovedClientConfig(request.getAppdConfig(),
prodTopicConfigTuple);
if (clientConfigBuilder != null) {
@@ -398,13 +395,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
HeartResponseM2P.Builder builder = HeartResponseM2P.newBuilder();
builder.setSuccess(false);
builder.setBrokerCheckSum(-1);
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -430,12 +426,10 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
final long inBrokerCheckSum = request.getBrokerCheckSum();
checkNodeStatus(producerId, strBuff);
- CertifiedResult authorizeResult =
- serverAuthHandler.validProducerAuthorizeInfo(
- certResult.userName, transTopicSet, rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if (!serverAuthHandler.validProducerAuthorizeInfo(
+ certifiedInfo.getUserName(), transTopicSet, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
try {
@@ -454,7 +448,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
final Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple =
getTopicConfigureInfos(producerId, false);
builder.setAuthorizedInfo(genAuthorizedInfo(
- certResult.authorizedToken, false).build());
+ certifiedInfo.getAuthorizedToken(), false).build());
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
if (brokerStaticInfo.getF0() != inBrokerCheckSum) {
builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
@@ -498,11 +492,9 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
final StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2P.Builder builder = CloseResponseM2P.newBuilder();
builder.setSuccess(false);
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
true, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
@@ -540,13 +532,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder();
builder.setSuccess(false);
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -618,12 +609,10 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData();
- CertifiedResult authorizeResult =
-
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, reqTopicSet, reqTopicConditions,
rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, reqTopicSet, reqTopicConditions, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// need removed for authorize center begin
@@ -708,7 +697,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
}
}
-
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(),
false));
builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -736,13 +725,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder();
builder.setSuccess(false);
// identity valid
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -764,13 +752,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
// authorize check
- CertifiedResult authorizeResult =
-
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, consumeGroupInfo.getTopicSet(),
- consumeGroupInfo.getTopicConditions(), rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, consumeGroupInfo.getTopicSet(),
+ consumeGroupInfo.getTopicConditions(), rmtAddress, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// heartbeat check
@@ -879,7 +865,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
}
}
-
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(),
false));
builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -904,11 +890,9 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2C.Builder builder = CloseResponseM2C.newBuilder();
builder.setSuccess(false);
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
@@ -949,21 +933,19 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
final String rmtAddress,
boolean overtls) throws Exception {
// #lizard forgives
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2B.Builder builder = RegisterResponseM2B.newBuilder();
builder.setSuccess(false);
builder.setStopRead(false);
builder.setStopWrite(false);
builder.setTakeConfInfo(false);
// auth
- CertifiedResult cfResult =
-
serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
- if (!cfResult.result) {
- builder.setErrCode(cfResult.errCode);
- builder.setErrMsg(cfResult.errInfo);
+ if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(),
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- ProcessResult result = new ProcessResult();
- final StringBuilder strBuff = new StringBuilder(512);
// get clientId and check valid
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
@@ -1061,6 +1043,8 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
final String rmtAddress,
boolean overtls) throws Exception {
// #lizard forgives
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
// set response field
HeartResponseM2B.Builder builder = HeartResponseM2B.newBuilder();
builder.setSuccess(false);
@@ -1074,15 +1058,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.setCurBrokerConfId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setConfCheckSumId(TBaseConstants.META_VALUE_UNDEFINED);
// identity broker info
- CertifiedResult certResult =
-
serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(),
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- ProcessResult result = new ProcessResult();
- final StringBuilder strBuff = new StringBuilder(512);
if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -1175,11 +1155,9 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder();
builder.setSuccess(false);
- CertifiedResult cfResult =
-
serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
- if (!cfResult.result) {
- builder.setErrCode(cfResult.errCode);
- builder.setErrMsg(cfResult.errInfo);
+ if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(),
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff,
result)) {
@@ -1216,13 +1194,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
ProcessResult result = new ProcessResult();
final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2CV2.Builder builder =
RegisterResponseM2CV2.newBuilder();
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -1298,12 +1275,10 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- CertifiedResult authorizeResult =
-
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, reqTopicSet, reqTopicConditions,
rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, reqTopicSet, reqTopicConditions, rmtAddress,
result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
Integer lid = null;
@@ -1366,7 +1341,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
}
builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo,
inConsumerInfo, opsTaskInfo));
-
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(),
false));
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
return builder.build();
@@ -1390,13 +1365,12 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
// response
HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder();
// identity valid
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
+ CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -1436,13 +1410,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
// authorize check
- CertifiedResult authorizeResult =
-
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, consumeGroupInfo.getTopicSet(),
- consumeGroupInfo.getTopicConditions(), rmtAddress);
- if (!authorizeResult.result) {
- builder.setErrCode(authorizeResult.errCode);
- builder.setErrMsg(authorizeResult.errInfo);
+ if
(!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
+ groupName, consumeGroupInfo.getTopicSet(),
+ consumeGroupInfo.getTopicConditions(), rmtAddress, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// heartbeat check
@@ -1488,7 +1460,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
}
builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo,
inConsumerInfo, opsTaskInfo));
-
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+
builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(),
false));
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
return builder.build();
@@ -1510,11 +1482,9 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
ProcessResult reslut = new ProcessResult();
StringBuilder strBuff = new StringBuilder(512);
GetPartMetaResponseM2C.Builder builder =
GetPartMetaResponseM2C.newBuilder();
- CertifiedResult certResult =
- serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false);
- if (!certResult.result) {
- builder.setErrCode(certResult.errCode);
- builder.setErrMsg(certResult.errInfo);
+ if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(),
false, reslut)) {
+ builder.setErrCode(reslut.getErrCode());
+ builder.setErrMsg(reslut.getErrMsg());
return builder.build();
}
if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff,
reslut)) {