This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 2fc3b39 feat(protocol): unify name of request header (#284)
2fc3b39 is described below
commit 2fc3b399553b6ba35612f46d2cd9a19869a77ad4
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Nov 4 10:19:30 2019 +0800
feat(protocol): unify name of request header (#284)
- unify request header format: XXXRequestHeader
---
consumer/consumer.go | 8 ++++----
consumer/offset_store.go | 4 ++--
consumer/push_consumer.go | 4 ++--
internal/client.go | 8 ++++----
internal/mock_client.go | 4 ++--
internal/request.go | 40 ++++++++++++++++++++--------------------
internal/route.go | 2 +-
internal/trace.go | 2 +-
producer/producer.go | 2 +-
9 files changed, 37 insertions(+), 37 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 55fe08f..f6a0bcd 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -828,7 +828,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context,
queue *primitive.Messa
queue.BrokerName, brokerResult.BrokerVersion,
data.ExpType)
}
- pullRequest := &internal.PullMessageRequest{
+ pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: dc.consumerGroup,
Topic: queue.Topic,
QueueId: int32(queue.QueueId),
@@ -898,7 +898,7 @@ func (dc *defaultConsumer) findConsumerList(topic string)
[]string {
}
if brokerAddr != "" {
- req := &internal.GetConsumerList{
+ req := &internal.GetConsumerListRequestHeader{
ConsumerGroup: dc.consumerGroup,
}
cmd :=
remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
@@ -937,7 +937,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq
*primitive.MessageQueue) (int64, er
return -1, fmt.Errorf("the broker [%s] does not exist",
mq.BrokerName)
}
- request := &internal.GetMaxOffsetRequest{
+ request := &internal.GetMaxOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
}
@@ -966,7 +966,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq
*primitive.MessageQueue, t
return -1, fmt.Errorf("the broker [%s] does not exist",
mq.BrokerName)
}
- request := &internal.SearchOffsetRequest{
+ request := &internal.SearchOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 2db940c..e86ec3b 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -330,7 +330,7 @@ func (r *remoteBrokerOffsetStore)
fetchConsumeOffsetFromBroker(group string, mq
if broker == "" {
return int64(-1), fmt.Errorf("broker: %s address not found",
mq.BrokerName)
}
- queryOffsetRequest := &internal.QueryConsumerOffsetRequest{
+ queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
@@ -363,7 +363,7 @@ func (r *remoteBrokerOffsetStore)
updateConsumeOffsetToBroker(group string, mq p
return fmt.Errorf("broker: %s address not found", mq.BrokerName)
}
- updateOffsetRequest := &internal.UpdateConsumerOffsetRequest{
+ updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d75f7da..600f4ed 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -500,7 +500,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression
!= "", classFilter)
- pullRequest := &internal.PullMessageRequest{
+ pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
@@ -616,7 +616,7 @@ func (pc *pushConsumer) sendMessageBack(brokerName string,
msg *primitive.Messag
}
func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt,
delayLevel int) *remote.RemotingCommand {
- req := &internal.ConsumerSendMsgBackRequest{
+ req := &internal.ConsumerSendMsgBackRequestHeader{
Group: pc.consumerGroup,
OriginTopic: msg.Topic,
Offset: msg.CommitLogOffset,
diff --git a/internal/client.go b/internal/client.go
index e9cc59a..ee77b97 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -143,8 +143,8 @@ type RMQClient interface {
RegisterConsumer(group string, consumer InnerConsumer) error
UnregisterConsumer(group string)
- PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequest) (*primitive.PullResult, error)
- PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(result *primitive.PullResult)) error
+ PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequestHeader) (*primitive.PullResult, error)
+ PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequestHeader, f func(result *primitive.PullResult)) error
RebalanceImmediately()
UpdatePublishInfo(topic string, data *TopicRouteData)
}
@@ -465,7 +465,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string,
cmd *remote.RemotingC
}
// PullMessage with sync
-func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd,
10*time.Second)
if err != nil {
@@ -520,7 +520,7 @@ func (c *rmqClient) decodeCommandCustomHeader(pr
*primitive.PullResult, cmd *rem
}
// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string,
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string,
request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
return nil
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index a216e93..730c073 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -363,7 +363,7 @@ func (mr *MockRMQClientMockRecorder)
UnregisterConsumer(group interface{}) *gomo
}
// PullMessage mocks base method
-func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
ret0, _ := ret[0].(*primitive.PullResult)
ret1, _ := ret[1].(error)
@@ -376,7 +376,7 @@ func (mr *MockRMQClientMockRecorder) PullMessage(ctx,
brokerAddrs, request inter
}
// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs
string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs
string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error
{
ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
ret0, _ := ret[0].(error)
return ret0
diff --git a/internal/request.go b/internal/request.go
index 3dce875..50af13a 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -45,7 +45,7 @@ const (
ReqConsumeMessageDirectly = int16(309)
)
-type SendMessageRequest struct {
+type SendMessageRequestHeader struct {
ProducerGroup string `json:"producerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
@@ -59,7 +59,7 @@ type SendMessageRequest struct {
Batch bool
}
-func (request *SendMessageRequest) Encode() map[string]string {
+func (request *SendMessageRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["producerGroup"] = request.ProducerGroup
maps["topic"] = request.Topic
@@ -78,7 +78,7 @@ func (request *SendMessageRequest) Encode() map[string]string
{
return maps
}
-func (request *SendMessageRequest) Decode(properties map[string]string) error {
+func (request *SendMessageRequestHeader) Decode(properties map[string]string)
error {
return nil
}
@@ -144,7 +144,7 @@ func (request *CheckTransactionStateRequestHeader)
Decode(ext map[string]string)
}
}
-type ConsumerSendMsgBackRequest struct {
+type ConsumerSendMsgBackRequestHeader struct {
Group string `json:"group"`
Offset int64 `json:"offset"`
DelayLevel int `json:"delayLevel"`
@@ -154,7 +154,7 @@ type ConsumerSendMsgBackRequest struct {
MaxReconsumeTimes int32 `json:"maxReconsumeTimes"`
}
-func (request *ConsumerSendMsgBackRequest) Encode() map[string]string {
+func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["group"] = request.Group
maps["offset"] = strconv.FormatInt(request.Offset, 10)
@@ -167,7 +167,7 @@ func (request *ConsumerSendMsgBackRequest) Encode()
map[string]string {
return maps
}
-type PullMessageRequest struct {
+type PullMessageRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
@@ -181,7 +181,7 @@ type PullMessageRequest struct {
ExpressionType string `json:"expressionType"`
}
-func (request *PullMessageRequest) Encode() map[string]string {
+func (request *PullMessageRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -197,35 +197,35 @@ func (request *PullMessageRequest) Encode()
map[string]string {
return maps
}
-type GetConsumerList struct {
+type GetConsumerListRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}
-func (request *GetConsumerList) Encode() map[string]string {
+func (request *GetConsumerListRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
return maps
}
-type GetMaxOffsetRequest struct {
+type GetMaxOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int `json:"queueId"`
}
-func (request *GetMaxOffsetRequest) Encode() map[string]string {
+func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
return maps
}
-type QueryConsumerOffsetRequest struct {
+type QueryConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
}
-func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -233,13 +233,13 @@ func (request *QueryConsumerOffsetRequest) Encode()
map[string]string {
return maps
}
-type SearchOffsetRequest struct {
+type SearchOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int `json:"queueId"`
Timestamp int64 `json:"timestamp"`
}
-func (request *SearchOffsetRequest) Encode() map[string]string {
+func (request *SearchOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["Topic"] = request.Topic
maps["QueueId"] = strconv.Itoa(request.QueueId)
@@ -247,14 +247,14 @@ func (request *SearchOffsetRequest) Encode()
map[string]string {
return maps
}
-type UpdateConsumerOffsetRequest struct {
+type UpdateConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int `json:"queueId"`
CommitOffset int64 `json:"commitOffset"`
}
-func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
@@ -263,16 +263,16 @@ func (request *UpdateConsumerOffsetRequest) Encode()
map[string]string {
return maps
}
-type GetRouteInfoRequest struct {
+type GetRouteInfoRequestHeader struct {
Topic string `json:"topic"`
}
-func (request *GetRouteInfoRequest) Encode() map[string]string {
+func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
return maps
}
-func (request *GetRouteInfoRequest) Decode(properties map[string]string) error
{
+func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string)
error {
return nil
}
diff --git a/internal/route.go b/internal/route.go
index 39551be..716a2f7 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -310,7 +310,7 @@ func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr
string) int32 {
}
func (s *namesrvs) queryTopicRouteInfoFromServer(topic string)
(*TopicRouteData, error) {
- request := &GetRouteInfoRequest{
+ request := &GetRouteInfoRequestHeader{
Topic: topic,
}
diff --git a/internal/trace.go b/internal/trace.go
index 1392e77..00074ee 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -440,7 +440,7 @@ func (td *traceDispatcher) findMq()
(*primitive.MessageQueue, string) {
func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
msg *primitive.Message) *remote.RemotingCommand {
- req := &SendMessageRequest{
+ req := &SendMessageRequestHeader{
ProducerGroup: TraceGroupName,
Topic: mq.Topic,
QueueId: mq.QueueId,
diff --git a/producer/producer.go b/producer/producer.go
index f0a995b..78cd8d9 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -275,7 +275,7 @@ func (p *defaultProducer) buildSendRequest(mq
*primitive.MessageQueue,
}
}
- req := &internal.SendMessageRequest{
+ req := &internal.SendMessageRequestHeader{
ProducerGroup: p.group,
Topic: mq.Topic,
QueueId: mq.QueueId,