This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
new e2e989f [TUBEMQ-409] Adjust the configuration items of protobuf (#311)
e2e989f is described below
commit e2e989f33670f5b6c455e84b2f86df2b66b5f5cd
Author: gosonzhang <[email protected]>
AuthorDate: Mon Nov 9 16:48:42 2020 +0800
[TUBEMQ-409] Adjust the configuration items of protobuf (#311)
Co-authored-by: gosonzhang <[email protected]>
---
tubemq-core/src/main/proto/BrokerService.proto | 65 ++++++++++++------
tubemq-core/src/main/proto/MasterService.proto | 77 ++++++++++++++++++++++
.../tubemq/server/broker/nodeinfo/AssignInfo.java | 19 ++++--
3 files changed, 134 insertions(+), 27 deletions(-)
diff --git a/tubemq-core/src/main/proto/BrokerService.proto
b/tubemq-core/src/main/proto/BrokerService.proto
index f5f170f..074c42d 100644
--- a/tubemq-core/src/main/proto/BrokerService.proto
+++ b/tubemq-core/src/main/proto/BrokerService.proto
@@ -141,22 +141,28 @@ message CommitOffsetResponseB2C {
optional int64 maxOffset = 5;
}
-message AssignTask {
+/*************************** v2 ****************************/
+
+
+message PartOpStatus {
+ required string partitionKey = 1;
+ optional int32 opStatus = 2;
+ optional int64 leftProcessTime = 3;
+ optional int64 rightProcessTime = 4;
+}
+
+message InitRegTask {
required int64 taskId = 1;
- required int64 rebalanceId = 2;
- required int32 rangeType = 3;
- optional int32 tupleValueType = 4;
- optional int64 leftValue = 5;
- optional int64 rightValue = 6;
- optional int32 assignOpStatus = 7;
+ required int32 rangeType = 2;
+ optional int32 tupleValueType = 3;
+ optional int64 leftValue = 4;
+ optional int64 rightValue = 5;
+ optional int32 opStatus = 6;
}
-message AssignStatus {
+message InitStatus {
required int64 taskId = 1;
- required int64 rebalanceId = 2;
- optional int32 assignOpStatus = 3;
- optional int64 leftProcessTime = 4;
- optional int64 rightProcessTime = 5;
+ repeated PartOpStatus partOpStatus = 2;
}
message RegisterV2RequestC2B {
@@ -168,15 +174,34 @@ message RegisterV2RequestC2B {
required int32 readStatus = 6;
repeated string filterCondStr = 7;
optional int32 qryPriorityId = 8;
- optional AssignTask assignTask = 9;
+ optional InitRegTask initRegTask = 9;
optional AuthorizedInfo authInfo = 10;
}
message RegisterV2ResponseB2C {
- required bool success = 1;
- required int32 errCode = 2;
- required string errMsg = 3;
- optional AssignStatus assignStatus = 4;
- optional int64 currOffset = 5;
- optional int64 maxOffset = 6;
-}
\ No newline at end of file
+ required int32 errCode = 1;
+ optional string errMsg = 2;
+ optional InitStatus initStatus = 3;
+ optional int64 currOffset = 4;
+ optional int64 maxOffset = 5;
+}
+
+message HeartBeatV2RequestC2B {
+ required string clientId = 1;
+ required string groupName = 2;
+ required int32 readStatus = 3;
+ /* brokerId:host:port:topic:partitionId:delayTimeStamp */
+ repeated string partitionInfo = 4;
+ optional int32 qryPriorityId = 5;
+ optional AuthorizedInfo authInfo = 6;
+}
+
+message HeartBeatV2ResponseB2C {
+ required int32 errCode = 1;
+ optional string errMsg = 2;
+ optional bool hasPartFailure = 3;
+ /* failCode:brokerId:host:port:topic:partitionId:delayTimeStamp */
+ repeated string failureInfo = 4;
+ optional InitStatus initStatus = 5;
+ optional bool requireAuth = 6;
+}
diff --git a/tubemq-core/src/main/proto/MasterService.proto
b/tubemq-core/src/main/proto/MasterService.proto
index cefd2d2..13a4b1e 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -290,6 +290,83 @@ message CloseResponseM2B {
/*************************** v2 ****************************/
+message ConsumeTarget {
+ required bool requireAssign = 1;
+ repeated string topicFilterInfo = 2;
+ optional string sessionKey = 3;
+ optional int32 totalCount = 4;
+ optional int32 rangeType = 5;
+ optional int32 tupleValueType = 6;
+ optional int32 confSelect = 7;
+ optional string assignPartitions = 8;
+}
+
+message PartInitInfo {
+ required string partitionKey = 1;
+ optional int32 rangeType = 2;
+ optional int32 tupleValueType = 3;
+ optional int64 leftValue = 4;
+ optional int64 rightValue = 5;
+ optional int32 opStatus = 6;
+}
+
+message RegInitInfo {
+ required int64 taskId = 1;
+ repeated PartInitInfo partInitInfo = 2;
+}
+
+message PolicyInfo {
+ optional int32 qryPriorityId = 1;
+ optional int64 defFlowCheckId = 2;
+ optional int64 groupFlowCheckId = 3;
+ optional string defFlowControlInfo = 4;
+ optional string groupFlowControlInfo = 5;
+}
+
+message RegisterV2RequestC2M {
+ required string clientId = 1;
+ required string groupName = 2;
+ required string hostName = 3;
+ required int64 sessionTime = 4;
+ required ConsumeTarget consumeTarget = 5;
+ /* consumerId@group-brokerId:host:port-topic:partitionId */
+ repeated string subscribeInfo = 6;
+ optional PolicyInfo policyInfo = 7;
+ optional RegInitInfo regInitInfo = 8;
+ optional MasterCertificateInfo authInfo = 9;
+ optional string jdkVersion = 10;
+}
+
+message RegisterV2ResponseM2C {
+ required int32 errCode = 1;
+ optional string errMsg = 2;
+ optional RegInitInfo regInitInfo = 3;
+ optional PolicyInfo policyInfo = 4;
+ optional MasterAuthorizedInfo authorizedInfo = 5;
+}
+
+message HeartV2RequestC2M {
+ required string clientId = 1;
+ required string groupName = 2;
+ required bool reportSubscribeInfo = 3;
+ repeated string subscribeInfo = 4;
+ optional EventProto event = 5;
+ optional RegInitInfo regInitInfo = 6;
+ optional PolicyInfo policyInfo = 7;
+ optional MasterCertificateInfo authInfo = 8;
+}
+
+message HeartV2ResponseM2C {
+ required int32 errCode = 1;
+ optional string errMsg = 2;
+ optional EventProto event = 3;
+ optional RegInitInfo regInitInfo = 4;
+ optional PolicyInfo policyInfo = 5;
+ optional bool requireAuth = 6;
+ optional MasterAuthorizedInfo authorizedInfo = 7;
+}
+
+
message DataStorePath {
required int32 recordId = 1;
required bool isActive = 2;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
index 10723a5..307feed 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
@@ -28,9 +28,8 @@ import
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
* Consumer request's consume task assign info
*/
public class AssignInfo {
- private boolean spAssign = false;
+ private boolean spAssign = false; // support assign partition
private long taskId = TBaseConstants.META_VALUE_UNDEFINED;
- private long rebalanceId = TBaseConstants.META_VALUE_UNDEFINED;
private RangeType rangeType = RangeType.RANGE_SET_UNDEFINED;
private TupleType tupleType = TupleType.TUPLE_VALUE_TYPE_ALL_OFFSET;
private RangeTuple origTuple = new RangeTuple();
@@ -59,13 +58,12 @@ public class AssignInfo {
public AssignInfo(ClientBroker.RegisterV2RequestC2B request) {
this.spAssign = true;
- ClientBroker.AssignTask assignTask =
- request.hasAssignTask() ? request.getAssignTask() : null;
+ ClientBroker.InitRegTask assignTask =
+ request.hasInitRegTask() ? request.getInitRegTask() : null;
if (assignTask == null) {
return;
}
this.taskId = assignTask.getTaskId();
- this.rebalanceId = assignTask.getRebalanceId();
this.rangeType = RangeType.valueOf(assignTask.getRangeType());
if (assignTask.hasTupleValueType()) {
this.tupleType = TupleType.valueOf(assignTask.getTupleValueType());
@@ -76,8 +74,8 @@ public class AssignInfo {
if (assignTask.hasRightValue()) {
this.origTuple.setRightValue(assignTask.getRightValue());
}
- if (assignTask.hasAssignOpStatus()) {
- rcvStatus = TupleOpStatus.valueOf(assignTask.getAssignOpStatus());
+ if (assignTask.hasOpStatus()) {
+ rcvStatus = TupleOpStatus.valueOf(assignTask.getOpStatus());
}
}
@@ -89,4 +87,11 @@ public class AssignInfo {
return targetTuple.getLeftValue();
}
+ public long getRightOffset() {
+ if (spAssign && (rangeType == RangeType.RANGE_SET_RIGHT_DEFINED
+ || rangeType == RangeType.RANGE_SET_BOTH_DEFINED)) {
+ return targetTuple.getRightValue();
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
}