This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
     new e2e989f  [TUBEMQ-409] Adjust the configuration items of protobuf (#311)
e2e989f is described below

commit e2e989f33670f5b6c455e84b2f86df2b66b5f5cd
Author: gosonzhang <[email protected]>
AuthorDate: Mon Nov 9 16:48:42 2020 +0800

    [TUBEMQ-409] Adjust the configuration items of protobuf (#311)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 tubemq-core/src/main/proto/BrokerService.proto     | 65 ++++++++++++------
 tubemq-core/src/main/proto/MasterService.proto     | 77 ++++++++++++++++++++++
 .../tubemq/server/broker/nodeinfo/AssignInfo.java  | 19 ++++--
 3 files changed, 134 insertions(+), 27 deletions(-)

diff --git a/tubemq-core/src/main/proto/BrokerService.proto 
b/tubemq-core/src/main/proto/BrokerService.proto
index f5f170f..074c42d 100644
--- a/tubemq-core/src/main/proto/BrokerService.proto
+++ b/tubemq-core/src/main/proto/BrokerService.proto
@@ -141,22 +141,28 @@ message CommitOffsetResponseB2C {
     optional int64 maxOffset = 5;
 }
 
-message AssignTask {
+/*************************** v2  ****************************/
+
+
+message PartOpStatus {
+    required string partitionKey = 1;
+    optional int32 opStatus = 2;
+    optional int64 leftProcessTime = 3;
+    optional int64 rightProcessTime = 4;
+}
+
+message InitRegTask {
     required int64 taskId = 1;
-    required int64 rebalanceId = 2;
-    required int32 rangeType = 3;
-    optional int32 tupleValueType = 4;
-    optional int64 leftValue = 5;
-    optional int64 rightValue = 6;
-    optional int32 assignOpStatus = 7;
+    required int32 rangeType = 2;
+    optional int32 tupleValueType = 3;
+    optional int64 leftValue = 4;
+    optional int64 rightValue = 5;
+    optional int32 opStatus = 6;
 }
 
-message AssignStatus {
+message InitStatus {
     required int64 taskId = 1;
-    required int64 rebalanceId = 2;
-    optional int32 assignOpStatus = 3;
-    optional int64 leftProcessTime = 4;
-    optional int64 rightProcessTime = 5;
+    repeated PartOpStatus partOpStatus = 2;
 }
 
 message RegisterV2RequestC2B {
@@ -168,15 +174,34 @@ message RegisterV2RequestC2B {
     required int32 readStatus = 6;
     repeated string filterCondStr = 7;
     optional int32 qryPriorityId = 8;
-    optional AssignTask assignTask = 9;
+    optional InitRegTask initRegTask = 9;
     optional AuthorizedInfo authInfo = 10;
 }
 
 message RegisterV2ResponseB2C {
-    required bool success = 1;
-    required int32 errCode = 2;
-    required string errMsg = 3;
-    optional AssignStatus assignStatus = 4;
-    optional int64 currOffset = 5;
-    optional int64 maxOffset = 6;
-}
\ No newline at end of file
+    required int32 errCode = 1;
+    optional string errMsg = 2;
+    optional InitStatus initStatus = 3;
+    optional int64 currOffset = 4;
+    optional int64 maxOffset = 5;
+}
+
+message HeartBeatV2RequestC2B {
+    required string clientId = 1;
+    required string groupName = 2;
+    required int32 readStatus = 3;
+    /* brokerId:host:port:topic:partitionId:delayTimeStamp */
+    repeated string partitionInfo = 4;
+    optional int32 qryPriorityId = 5;
+    optional AuthorizedInfo authInfo = 6;
+}
+
+message HeartBeatV2ResponseB2C {
+    required int32 errCode = 1;
+    optional string errMsg = 2;
+    optional bool hasPartFailure = 3;
+    /* failCode:brokerId:host:port:topic:partitionId:delayTimeStamp */
+    repeated string failureInfo = 4;
+    optional InitStatus initStatus = 5;
+    optional bool requireAuth = 6;
+}
diff --git a/tubemq-core/src/main/proto/MasterService.proto 
b/tubemq-core/src/main/proto/MasterService.proto
index cefd2d2..13a4b1e 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -290,6 +290,83 @@ message CloseResponseM2B {
 
 /*************************** v2  ****************************/
 
+message ConsumeTarget {
+    required bool    requireAssign = 1;
+    repeated string  topicFilterInfo = 2;
+    optional string  sessionKey = 3;
+    optional int32   totalCount = 4;
+    optional int32   rangeType = 5;
+    optional int32   tupleValueType = 6;
+    optional int32   confSelect = 7;
+    optional string  assignPartitions = 8;
+}
+
+message PartInitInfo {
+    required string partitionKey = 1;
+    optional int32  rangeType = 2;
+    optional int32  tupleValueType = 3;
+    optional int64  leftValue = 4;
+    optional int64  rightValue = 5;
+    optional int32  opStatus = 6;
+}
+
+message RegInitInfo {
+    required int64 taskId = 1;
+    repeated PartInitInfo partInitInfo = 2;
+}
+
+message PolicyInfo {
+    optional int32 qryPriorityId = 1;
+    optional int64 defFlowCheckId = 2;
+    optional int64 groupFlowCheckId = 3;
+    optional string defFlowControlInfo = 4;
+    optional string groupFlowControlInfo = 5;
+}
+
+message RegisterV2RequestC2M {
+    required string clientId = 1;
+    required string groupName = 2;
+    required string hostName = 3;
+    required int64 sessionTime = 4;
+    required ConsumeTarget consumeTarget = 5;
+    /* consumerId@group-brokerId:host:port-topic:partitionId */
+    repeated string subscribeInfo = 6;
+    optional PolicyInfo policyInfo = 7;
+    optional RegInitInfo regInitInfo = 8;
+    optional MasterCertificateInfo authInfo = 9;
+    optional string jdkVersion = 10;
+}
+
+message RegisterV2ResponseM2C {
+    required int32 errCode = 1;
+    optional string errMsg = 2;
+    optional RegInitInfo regInitInfo = 3;
+    optional PolicyInfo policyInfo = 4;
+    optional MasterAuthorizedInfo authorizedInfo = 5;
+}
+
+message HeartV2RequestC2M {
+    required string clientId = 1;
+    required string groupName = 2;
+    required bool reportSubscribeInfo = 3;
+    repeated string subscribeInfo = 4;
+    optional EventProto event = 5;
+    optional RegInitInfo regInitInfo = 6;
+    optional PolicyInfo policyInfo = 7;
+    optional MasterCertificateInfo authInfo = 8;
+}
+
+message HeartV2ResponseM2C {
+    required int32 errCode = 1;
+    optional string errMsg = 2;
+    optional EventProto event = 3;
+    optional RegInitInfo regInitInfo = 4;
+    optional PolicyInfo policyInfo = 5;
+    optional bool requireAuth = 6;
+    optional MasterAuthorizedInfo authorizedInfo = 7;
+}
+
+
 message DataStorePath {
     required int32 recordId = 1;
     required bool isActive = 2;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
index 10723a5..307feed 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
@@ -28,9 +28,8 @@ import 
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
  * Consumer request's consume task assign info
  */
 public class AssignInfo {
-    private boolean spAssign = false;
+    private boolean spAssign = false; // support assign partition
     private long taskId = TBaseConstants.META_VALUE_UNDEFINED;
-    private long rebalanceId = TBaseConstants.META_VALUE_UNDEFINED;
     private RangeType rangeType = RangeType.RANGE_SET_UNDEFINED;
     private TupleType tupleType = TupleType.TUPLE_VALUE_TYPE_ALL_OFFSET;
     private RangeTuple origTuple = new RangeTuple();
@@ -59,13 +58,12 @@ public class AssignInfo {
 
     public AssignInfo(ClientBroker.RegisterV2RequestC2B request) {
         this.spAssign = true;
-        ClientBroker.AssignTask assignTask =
-                request.hasAssignTask() ? request.getAssignTask() : null;
+        ClientBroker.InitRegTask assignTask =
+                request.hasInitRegTask() ? request.getInitRegTask() : null;
         if (assignTask == null) {
             return;
         }
         this.taskId = assignTask.getTaskId();
-        this.rebalanceId = assignTask.getRebalanceId();
         this.rangeType = RangeType.valueOf(assignTask.getRangeType());
         if (assignTask.hasTupleValueType()) {
             this.tupleType = TupleType.valueOf(assignTask.getTupleValueType());
@@ -76,8 +74,8 @@ public class AssignInfo {
         if (assignTask.hasRightValue()) {
             this.origTuple.setRightValue(assignTask.getRightValue());
         }
-        if (assignTask.hasAssignOpStatus()) {
-            rcvStatus = TupleOpStatus.valueOf(assignTask.getAssignOpStatus());
+        if (assignTask.hasOpStatus()) {
+            rcvStatus = TupleOpStatus.valueOf(assignTask.getOpStatus());
         }
     }
 
@@ -89,4 +87,11 @@ public class AssignInfo {
         return targetTuple.getLeftValue();
     }
 
+    public long getRightOffset() {
+        if (spAssign && (rangeType == RangeType.RANGE_SET_RIGHT_DEFINED
+                || rangeType == RangeType.RANGE_SET_BOTH_DEFINED)) {
+            return targetTuple.getRightValue();
+        }
+        return TBaseConstants.META_VALUE_UNDEFINED;
+    }
 }

Reply via email to