This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 5790fc48 golang: Add namespace in Resource and metadata (#753)
5790fc48 is described below
commit 5790fc486a0e45f96b78e64550a54371c3deac61
Author: guyinyou <[email protected]>
AuthorDate: Thu May 16 15:32:35 2024 +0800
golang: Add namespace in Resource and metadata (#753)
* metadata和resource支持namespace字段
* add ut
---------
Co-authored-by: guyinyou <[email protected]>
---
golang/client.go | 5 ++-
golang/client_test.go | 6 ++--
golang/metadata/metadata.go | 10 +++---
golang/producer.go | 13 ++++---
golang/publishing_message.go | 8 +++--
.../metadata.go => publishing_message_test.go} | 42 ++++++++++------------
golang/simple_consumer.go | 15 +++++---
golang/simple_consumer_options.go | 3 +-
golang/transaction.go | 4 +--
9 files changed, 59 insertions(+), 47 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 71c48198..9f60e061 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context,
topic string, duration
func (cli *defaultClient) getQueryRouteRequest(topic string)
*v2.QueryRouteRequest {
return &v2.QueryRouteRequest{
Topic: &v2.Resource{
- Name: topic,
+ Name: topic,
+ ResourceNamespace: cli.config.NameSpace,
},
Endpoints: cli.accessPoint,
}
@@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context)
context.Context {
innerMD.VersionValue,
innerMD.ClintID,
cli.clientID,
+ innerMD.NameSpace,
+ cli.config.NameSpace,
innerMD.DateTime,
now,
innerMD.Authorization,
diff --git a/golang/client_test.go b/golang/client_test.go
index 4549bdfe..716ca8cf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -298,7 +298,8 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T)
{
func Test_routeEqual(t *testing.T) {
oldMq := &v2.MessageQueue{
Topic: &v2.Resource{
- Name: "topic-test",
+ Name: "topic-test",
+ ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
@@ -313,7 +314,8 @@ func Test_routeEqual(t *testing.T) {
}
newMq := &v2.MessageQueue{
Topic: &v2.Resource{
- Name: "topic-test",
+ Name: "topic-test",
+ ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
diff --git a/golang/metadata/metadata.go b/golang/metadata/metadata.go
index 5a8b919d..ccc6627c 100644
--- a/golang/metadata/metadata.go
+++ b/golang/metadata/metadata.go
@@ -18,11 +18,11 @@
package metadata
const (
- LanguageKey = "x-mq-language"
- ProtocolKey = "x-mq-protocol"
- RequestID = "x-mq-request-id"
- VersionKey = "x-mq-client-version"
- // NameSpace = "x-mq-namespace"
+ LanguageKey = "x-mq-language"
+ ProtocolKey = "x-mq-protocol"
+ RequestID = "x-mq-request-id"
+ VersionKey = "x-mq-client-version"
+ NameSpace = "x-mq-namespace"
DateTime = "x-mq-date-time"
ClintID = "x-mq-client-id"
Authorization = "authorization"
diff --git a/golang/producer.go b/golang/producer.go
index f2fbf590..3e878a93 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts
...ProducerOption) (Producer, error)
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
- Name: topic,
+ Name: topic,
+ ResourceNamespace: config.NameSpace,
}
p.pSetting.topics.Store(topic, topicResource)
}
@@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs
[]*UnifiedMessage, txE
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
- pubMessage, err = NewPublishingMessage(msg, p.pSetting,
txEnabled)
+ pubMessage, err = NewPublishingMessage(msg,
p.cli.config.NameSpace, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
@@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs
[]*UnifiedMessage, txE
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
- Name: topicName,
+ Name: topicName,
+ ResourceNamespace: p.cli.config.NameSpace,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
@@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx
context.Context, msg *Message,
return nil, fmt.Errorf("producer is not running")
}
t := transaction.(*transactionImpl)
- pubMessage, err := t.tryAddMessage(msg)
+ pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
if err != nil {
return nil, err
}
@@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx
context.Context, endpoints *v2.Endp
ctx = p.cli.Sign(ctx)
request := &v2.EndTransactionRequest{
Topic: &v2.Resource{
- Name: messageCommon.topic,
+ Name: messageCommon.topic,
+ ResourceNamespace: p.cli.config.NameSpace,
},
MessageId: messageId,
TransactionId: transactionId,
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 456202ab..ea55dd19 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -26,6 +26,7 @@ import (
)
type PublishingMessage struct {
+ namespace string
msg *Message
encoding v2.Encoding
messageId string
@@ -33,7 +34,7 @@ type PublishingMessage struct {
traceContext *string
}
-var NewPublishingMessage = func(msg *Message, settings *producerSettings,
txEnabled bool) (*PublishingMessage, error) {
+var NewPublishingMessage = func(msg *Message, namespace string, settings
*producerSettings, txEnabled bool) (*PublishingMessage, error) {
if msg == nil {
return nil, fmt.Errorf("message is nil")
}
@@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings
*producerSettings, txEnab
// No need to compress message body.
pMsg.encoding = v2.Encoding_IDENTITY
+ pMsg.namespace = namespace
+
// Generate message id.
pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
// Normal message.
@@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message,
error) {
msg := &v2.Message{
Topic: &v2.Resource{
// ResourceNamespace: b.conn.Config().NameSpace,
- Name: pMsg.msg.Topic,
+ Name: pMsg.msg.Topic,
+ ResourceNamespace: pMsg.namespace,
},
SystemProperties: &v2.SystemProperties{
Keys: pMsg.msg.GetKeys(),
diff --git a/golang/metadata/metadata.go b/golang/publishing_message_test.go
similarity index 59%
copy from golang/metadata/metadata.go
copy to golang/publishing_message_test.go
index 5a8b919d..5a030e48 100644
--- a/golang/metadata/metadata.go
+++ b/golang/publishing_message_test.go
@@ -15,29 +15,23 @@
* limitations under the License.
*/
-package metadata
+package golang
-const (
- LanguageKey = "x-mq-language"
- ProtocolKey = "x-mq-protocol"
- RequestID = "x-mq-request-id"
- VersionKey = "x-mq-client-version"
- // NameSpace = "x-mq-namespace"
- DateTime = "x-mq-date-time"
- ClintID = "x-mq-client-id"
- Authorization = "authorization"
-)
+import "testing"
-const (
- LanguageValue = "GO"
- ProtocolValue = "v2"
- VersionValue = "5.0.1-rc1"
-)
-
-const (
- EncryptHeader = "MQv2-HMAC-SHA1"
- Rocketmq = "Rocketmq"
- Credential = "Credential"
- Signature = "Signature"
- SignedHeaders = "SignedHeaders"
-)
+func TestNewPublishingMessage(t *testing.T) {
+ namespace := "ns-test"
+ pSetting := &producerSettings{}
+ msg := &Message{}
+ pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
+ if err != nil {
+ t.Error(err)
+ }
+ v2Msg, err := pMsg.toProtobuf()
+ if err != nil {
+ t.Error(err)
+ }
+ if v2Msg.GetTopic().GetResourceNamespace() != namespace {
+ t.Error("namespace not equal")
+ }
+}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 8abfb413..f867696e 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer)
changeInvisibleDuration0(messageView *MessageVi
ctx := sc.cli.Sign(context.Background())
request := &v2.ChangeInvisibleDurationRequest{
Topic: &v2.Resource{
- Name: messageView.GetTopic(),
+ Name: messageView.GetTopic(),
+ ResourceNamespace: sc.cli.config.NameSpace,
},
Group: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: sc.cli.config.NameSpace,
},
ReceiptHandle: messageView.GetReceiptHandle(),
InvisibleDuration: durationpb.New(invisibleDuration),
@@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer)
wrapReceiveMessageRequest(batchSize int, messag
return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: sc.cli.config.NameSpace,
},
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
@@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer)
wrapAckMessageRequest(messageView *MessageView)
return &v2.AckMessageRequest{
Group: sc.scSettings.groupName,
Topic: &v2.Resource{
- Name: messageView.GetTopic(),
+ Name: messageView.GetTopic(),
+ ResourceNamespace: sc.cli.config.NameSpace,
},
Entries: []*v2.AckMessageEntry{
{
@@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts
...SimpleConsumerOption) (Simp
requestTimeout: sc.cli.opts.timeout,
groupName: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: config.NameSpace,
},
longPollingTimeout: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
diff --git a/golang/simple_consumer_options.go
b/golang/simple_consumer_options.go
index 857a4191..253b3c3d 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings
{
subscriptions := make([]*v2.SubscriptionEntry, 0)
for k, v := range sc.subscriptionExpressions {
topic := &v2.Resource{
- Name: k,
+ Name: k,
+ ResourceNamespace: sc.groupName.GetResourceNamespace(),
}
filterExpression := &v2.FilterExpression{
Expression: v.expression,
diff --git a/golang/transaction.go b/golang/transaction.go
index f6741101..89839616 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error {
return nil
}
-func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage,
error) {
+func (t *transactionImpl) tryAddMessage(message *Message, namespace string)
(*PublishingMessage, error) {
t.messagesLock.RLock()
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the
threshold: %d", MAX_MESSAGE_NUM)
@@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message)
(*PublishingMessage, e
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the
threshold: %d", MAX_MESSAGE_NUM)
}
- pubMessage, err := NewPublishingMessage(message,
t.producerImpl.(*defaultProducer).pSetting, true)
+ pubMessage, err := NewPublishingMessage(message, namespace,
t.producerImpl.(*defaultProducer).pSetting, true)
if err != nil {
return nil, err
}