This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2fc3b39  feat(protocol): unify name of request header (#284)
2fc3b39 is described below

commit 2fc3b399553b6ba35612f46d2cd9a19869a77ad4
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Nov 4 10:19:30 2019 +0800

    feat(protocol): unify name of request header (#284)
    
    - unify request header format: XXXRequestHeader
---
 consumer/consumer.go      |  8 ++++----
 consumer/offset_store.go  |  4 ++--
 consumer/push_consumer.go |  4 ++--
 internal/client.go        |  8 ++++----
 internal/mock_client.go   |  4 ++--
 internal/request.go       | 40 ++++++++++++++++++++--------------------
 internal/route.go         |  2 +-
 internal/trace.go         |  2 +-
 producer/producer.go      |  2 +-
 9 files changed, 37 insertions(+), 37 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 55fe08f..f6a0bcd 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -828,7 +828,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, 
queue *primitive.Messa
                        queue.BrokerName, brokerResult.BrokerVersion, 
data.ExpType)
        }
 
-       pullRequest := &internal.PullMessageRequest{
+       pullRequest := &internal.PullMessageRequestHeader{
                ConsumerGroup: dc.consumerGroup,
                Topic:         queue.Topic,
                QueueId:       int32(queue.QueueId),
@@ -898,7 +898,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
        }
 
        if brokerAddr != "" {
-               req := &internal.GetConsumerList{
+               req := &internal.GetConsumerListRequestHeader{
                        ConsumerGroup: dc.consumerGroup,
                }
                cmd := 
remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
@@ -937,7 +937,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*primitive.MessageQueue) (int64, er
                return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
        }
 
-       request := &internal.GetMaxOffsetRequest{
+       request := &internal.GetMaxOffsetRequestHeader{
                Topic:   mq.Topic,
                QueueId: mq.QueueId,
        }
@@ -966,7 +966,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq 
*primitive.MessageQueue, t
                return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
        }
 
-       request := &internal.SearchOffsetRequest{
+       request := &internal.SearchOffsetRequestHeader{
                Topic:     mq.Topic,
                QueueId:   mq.QueueId,
                Timestamp: timestamp,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 2db940c..e86ec3b 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -330,7 +330,7 @@ func (r *remoteBrokerOffsetStore) 
fetchConsumeOffsetFromBroker(group string, mq
        if broker == "" {
                return int64(-1), fmt.Errorf("broker: %s address not found", 
mq.BrokerName)
        }
-       queryOffsetRequest := &internal.QueryConsumerOffsetRequest{
+       queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
                ConsumerGroup: group,
                Topic:         mq.Topic,
                QueueId:       mq.QueueId,
@@ -363,7 +363,7 @@ func (r *remoteBrokerOffsetStore) 
updateConsumeOffsetToBroker(group string, mq p
                return fmt.Errorf("broker: %s address not found", mq.BrokerName)
        }
 
-       updateOffsetRequest := &internal.UpdateConsumerOffsetRequest{
+       updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
                ConsumerGroup: group,
                Topic:         mq.Topic,
                QueueId:       mq.QueueId,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d75f7da..600f4ed 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -500,7 +500,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 
                sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression 
!= "", classFilter)
 
-               pullRequest := &internal.PullMessageRequest{
+               pullRequest := &internal.PullMessageRequestHeader{
                        ConsumerGroup:  pc.consumerGroup,
                        Topic:          request.mq.Topic,
                        QueueId:        int32(request.mq.QueueId),
@@ -616,7 +616,7 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, 
msg *primitive.Messag
 }
 
 func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, 
delayLevel int) *remote.RemotingCommand {
-       req := &internal.ConsumerSendMsgBackRequest{
+       req := &internal.ConsumerSendMsgBackRequestHeader{
                Group:             pc.consumerGroup,
                OriginTopic:       msg.Topic,
                Offset:            msg.CommitLogOffset,
diff --git a/internal/client.go b/internal/client.go
index e9cc59a..ee77b97 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -143,8 +143,8 @@ type RMQClient interface {
 
        RegisterConsumer(group string, consumer InnerConsumer) error
        UnregisterConsumer(group string)
-       PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest) (*primitive.PullResult, error)
-       PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest, f func(result *primitive.PullResult)) error
+       PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader) (*primitive.PullResult, error)
+       PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader, f func(result *primitive.PullResult)) error
        RebalanceImmediately()
        UpdatePublishInfo(topic string, data *TopicRouteData)
 }
@@ -465,7 +465,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, 
cmd *remote.RemotingC
 }
 
 // PullMessage with sync
-func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
        res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 
10*time.Second)
        if err != nil {
@@ -520,7 +520,7 @@ func (c *rmqClient) decodeCommandCustomHeader(pr 
*primitive.PullResult, cmd *rem
 }
 
 // PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
        return nil
 }
 
diff --git a/internal/mock_client.go b/internal/mock_client.go
index a216e93..730c073 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -363,7 +363,7 @@ func (mr *MockRMQClientMockRecorder) 
UnregisterConsumer(group interface{}) *gomo
 }
 
 // PullMessage mocks base method
-func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
        ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
        ret0, _ := ret[0].(*primitive.PullResult)
        ret1, _ := ret[1].(error)
@@ -376,7 +376,7 @@ func (mr *MockRMQClientMockRecorder) PullMessage(ctx, 
brokerAddrs, request inter
 }
 
 // PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs 
string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs 
string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error 
{
        ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
        ret0, _ := ret[0].(error)
        return ret0
diff --git a/internal/request.go b/internal/request.go
index 3dce875..50af13a 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -45,7 +45,7 @@ const (
        ReqConsumeMessageDirectly   = int16(309)
 )
 
-type SendMessageRequest struct {
+type SendMessageRequestHeader struct {
        ProducerGroup     string `json:"producerGroup"`
        Topic             string `json:"topic"`
        QueueId           int    `json:"queueId"`
@@ -59,7 +59,7 @@ type SendMessageRequest struct {
        Batch             bool
 }
 
-func (request *SendMessageRequest) Encode() map[string]string {
+func (request *SendMessageRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["producerGroup"] = request.ProducerGroup
        maps["topic"] = request.Topic
@@ -78,7 +78,7 @@ func (request *SendMessageRequest) Encode() map[string]string 
{
        return maps
 }
 
-func (request *SendMessageRequest) Decode(properties map[string]string) error {
+func (request *SendMessageRequestHeader) Decode(properties map[string]string) 
error {
        return nil
 }
 
@@ -144,7 +144,7 @@ func (request *CheckTransactionStateRequestHeader) 
Decode(ext map[string]string)
        }
 }
 
-type ConsumerSendMsgBackRequest struct {
+type ConsumerSendMsgBackRequestHeader struct {
        Group             string `json:"group"`
        Offset            int64  `json:"offset"`
        DelayLevel        int    `json:"delayLevel"`
@@ -154,7 +154,7 @@ type ConsumerSendMsgBackRequest struct {
        MaxReconsumeTimes int32  `json:"maxReconsumeTimes"`
 }
 
-func (request *ConsumerSendMsgBackRequest) Encode() map[string]string {
+func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["group"] = request.Group
        maps["offset"] = strconv.FormatInt(request.Offset, 10)
@@ -167,7 +167,7 @@ func (request *ConsumerSendMsgBackRequest) Encode() 
map[string]string {
        return maps
 }
 
-type PullMessageRequest struct {
+type PullMessageRequestHeader struct {
        ConsumerGroup        string        `json:"consumerGroup"`
        Topic                string        `json:"topic"`
        QueueId              int32         `json:"queueId"`
@@ -181,7 +181,7 @@ type PullMessageRequest struct {
        ExpressionType       string        `json:"expressionType"`
 }
 
-func (request *PullMessageRequest) Encode() map[string]string {
+func (request *PullMessageRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["consumerGroup"] = request.ConsumerGroup
        maps["topic"] = request.Topic
@@ -197,35 +197,35 @@ func (request *PullMessageRequest) Encode() 
map[string]string {
        return maps
 }
 
-type GetConsumerList struct {
+type GetConsumerListRequestHeader struct {
        ConsumerGroup string `json:"consumerGroup"`
 }
 
-func (request *GetConsumerList) Encode() map[string]string {
+func (request *GetConsumerListRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["consumerGroup"] = request.ConsumerGroup
        return maps
 }
 
-type GetMaxOffsetRequest struct {
+type GetMaxOffsetRequestHeader struct {
        Topic   string `json:"topic"`
        QueueId int    `json:"queueId"`
 }
 
-func (request *GetMaxOffsetRequest) Encode() map[string]string {
+func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["topic"] = request.Topic
        maps["queueId"] = strconv.Itoa(request.QueueId)
        return maps
 }
 
-type QueryConsumerOffsetRequest struct {
+type QueryConsumerOffsetRequestHeader struct {
        ConsumerGroup string `json:"consumerGroup"`
        Topic         string `json:"topic"`
        QueueId       int    `json:"queueId"`
 }
 
-func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["consumerGroup"] = request.ConsumerGroup
        maps["topic"] = request.Topic
@@ -233,13 +233,13 @@ func (request *QueryConsumerOffsetRequest) Encode() 
map[string]string {
        return maps
 }
 
-type SearchOffsetRequest struct {
+type SearchOffsetRequestHeader struct {
        Topic     string `json:"topic"`
        QueueId   int    `json:"queueId"`
        Timestamp int64  `json:"timestamp"`
 }
 
-func (request *SearchOffsetRequest) Encode() map[string]string {
+func (request *SearchOffsetRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["Topic"] = request.Topic
        maps["QueueId"] = strconv.Itoa(request.QueueId)
@@ -247,14 +247,14 @@ func (request *SearchOffsetRequest) Encode() 
map[string]string {
        return maps
 }
 
-type UpdateConsumerOffsetRequest struct {
+type UpdateConsumerOffsetRequestHeader struct {
        ConsumerGroup string `json:"consumerGroup"`
        Topic         string `json:"topic"`
        QueueId       int    `json:"queueId"`
        CommitOffset  int64  `json:"commitOffset"`
 }
 
-func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["consumerGroup"] = request.ConsumerGroup
        maps["topic"] = request.Topic
@@ -263,16 +263,16 @@ func (request *UpdateConsumerOffsetRequest) Encode() 
map[string]string {
        return maps
 }
 
-type GetRouteInfoRequest struct {
+type GetRouteInfoRequestHeader struct {
        Topic string `json:"topic"`
 }
 
-func (request *GetRouteInfoRequest) Encode() map[string]string {
+func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["topic"] = request.Topic
        return maps
 }
 
-func (request *GetRouteInfoRequest) Decode(properties map[string]string) error 
{
+func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string) 
error {
        return nil
 }
diff --git a/internal/route.go b/internal/route.go
index 39551be..716a2f7 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -310,7 +310,7 @@ func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr 
string) int32 {
 }
 
 func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) 
(*TopicRouteData, error) {
-       request := &GetRouteInfoRequest{
+       request := &GetRouteInfoRequestHeader{
                Topic: topic,
        }
 
diff --git a/internal/trace.go b/internal/trace.go
index 1392e77..00074ee 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -440,7 +440,7 @@ func (td *traceDispatcher) findMq() 
(*primitive.MessageQueue, string) {
 
 func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
        msg *primitive.Message) *remote.RemotingCommand {
-       req := &SendMessageRequest{
+       req := &SendMessageRequestHeader{
                ProducerGroup: TraceGroupName,
                Topic:         mq.Topic,
                QueueId:       mq.QueueId,
diff --git a/producer/producer.go b/producer/producer.go
index f0a995b..78cd8d9 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -275,7 +275,7 @@ func (p *defaultProducer) buildSendRequest(mq 
*primitive.MessageQueue,
                }
        }
 
-       req := &internal.SendMessageRequest{
+       req := &internal.SendMessageRequestHeader{
                ProducerGroup:  p.group,
                Topic:          mq.Topic,
                QueueId:        mq.QueueId,

Reply via email to