This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new dd3dbcc [ISSUE #1178] feat:pull consumer support Assign subscription
type
dd3dbcc is described below
commit dd3dbccb08087eaaa2e366066b97bbdc572b8911
Author: muyun.cyt <[email protected]>
AuthorDate: Mon Dec 2 11:33:21 2024 +0800
[ISSUE #1178] feat:pull consumer support Assign subscription type
1.Pull Consumer Support Assign Subscription Type.
2.add several apis for Assign Sub Type:
SeekOffset/Assign/OffsetForTimestamp/GetTopicRouteInfo
---
api.go | 11 +++
consumer/consumer.go | 8 ++
consumer/mock_offset_store.go | 4 +-
consumer/pull_consumer.go | 125 ++++++++++++++++++++++++++++-
consumer/statistics_test.go | 4 +-
errors/errors.go | 3 +
examples/consumer/pull/poll_assign/main.go | 115 ++++++++++++++++++++++++++
examples/consumer/tls/main.go | 118 +++++++++++++--------------
examples/producer/rpc/async/main.go | 1 -
examples/producer/tls/main.go | 124 ++++++++++++++--------------
10 files changed, 384 insertions(+), 129 deletions(-)
diff --git a/api.go b/api.go
index b0a203e..c4b5c08 100644
--- a/api.go
+++ b/api.go
@@ -82,6 +82,8 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer,
error) {
type PullConsumer interface {
// Start the PullConsumer for consuming message
Start() error
+ // GetTopicRouteInfo get topic route info
+ GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error)
// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector) error
@@ -89,6 +91,9 @@ type PullConsumer interface {
// Unsubscribe a topic
Unsubscribe(topic string) error
+ // Assign assign message queue to consumer
+ Assign(topic string, mqs []*primitive.MessageQueue) error
+
// Shutdown the PullConsumer, all offset of MessageQueue will be commit
to broker before process exit
Shutdown() error
@@ -104,6 +109,12 @@ type PullConsumer interface {
// PullFrom pull messages of queue from the offset to offset + numbers
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset
int64, numbers int) (*primitive.PullResult, error)
+ // SeekOffset seek offset for specific queue
+ SeekOffset(queue *primitive.MessageQueue, offset int64)
+
+ // OffsetForTimestamp get offset of specific queue with timestamp
+ OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64)
(int64, error)
+
// UpdateOffset updateOffset update offset of queue in mem
UpdateOffset(queue *primitive.MessageQueue, offset int64) error
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 98eb17b..9e9bedb 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error {
return nil
}
+func (dc *defaultConsumer) isRunning() bool {
+ return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
+}
+
+func (dc *defaultConsumer) isStopped() bool {
+ return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
+}
+
func (dc *defaultConsumer) persistConsumerOffset() error {
err := dc.makeSureStateOK()
if err != nil {
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index 1d7c3bd..c122927 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -117,7 +117,7 @@ func (mr *MockOffsetStoreMockRecorder) update(mq, offset,
increaseOnly interface
}
// getMQOffsetMap mocks base method
-func (m *MockOffsetStore) getMQOffsetMap(topic string)
map[primitive.MessageQueue]int64 {
+func (m *MockOffsetStore) getMQOffsetMap(topic string)
map[primitive.MessageQueue]int64 {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "getMQOffsetMap", topic)
ret0, _ := ret[0].(map[primitive.MessageQueue]int64)
@@ -125,7 +125,7 @@ func (m *MockOffsetStore) getMQOffsetMap(topic string)
map[primitive.MessageQueu
}
// getMQOffsetMap indicates an expected call of getMQOffsetMap
-func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string)
*gomock.Call{
+func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string)
*gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getMQOffsetMap",
reflect.TypeOf((*MockOffsetStore)(nil).getMQOffsetMap), topic)
}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index c66ffb7..3258779 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -64,6 +64,14 @@ func (cr *ConsumeRequest) GetPQ() *processQueue {
return cr.processQueue
}
+type SubscriptionType int
+
+const (
+ None SubscriptionType = iota
+ Subscribe
+ Assign
+)
+
type defaultPullConsumer struct {
*defaultConsumer
@@ -71,9 +79,11 @@ type defaultPullConsumer struct {
selector MessageSelector
GroupName string
Model MessageModel
+ SubType SubscriptionType
UnitMode bool
nextQueueSequence int64
allocateQueues []*primitive.MessageQueue
+ mq2seekOffset sync.Map //
key:primitive.MessageQueue,value:seekOffset
done chan struct{}
closeOnce sync.Once
@@ -116,6 +126,7 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
defaultConsumer: dc,
done: make(chan struct{}, 1),
consumeRequestCache: make(chan *ConsumeRequest, 4),
+ GroupName: dc.option.GroupName,
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageConcurrently
@@ -123,11 +134,32 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
return c, nil
}
+func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string)
([]*primitive.MessageQueue, error) {
+ topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic)
+ value, exist :=
pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
+ if exist {
+ return value.([]*primitive.MessageQueue), nil
+ }
+ pc.client.UpdateTopicRouteInfo()
+ value, exist =
pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
+ if !exist {
+ return nil, errors2.ErrRouteNotFound
+ }
+ return value.([]*primitive.MessageQueue), nil
+}
+
func (pc *defaultPullConsumer) Subscribe(topic string, selector
MessageSelector) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
+ if pc.SubType == Assign {
+ return errors2.ErrSubscriptionType
+ }
+
+ if pc.SubType == None {
+ pc.SubType = Subscribe
+ }
topic = utils.WrapNamespace(pc.option.Namespace, topic)
data := buildSubscriptionData(topic, selector)
@@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string,
selector MessageSelector)
}
func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
+ if pc.SubType == Assign {
+ return errors2.ErrSubscriptionType
+ }
topic = utils.WrapNamespace(pc.option.Namespace, topic)
pc.subscriptionDataTable.Delete(topic)
return nil
}
+func (pc *defaultPullConsumer) Assign(topic string, mqs
[]*primitive.MessageQueue) error {
+ if pc.SubType == Subscribe {
+ return errors2.ErrSubscriptionType
+ }
+ if pc.SubType == None {
+ pc.SubType = Assign
+ }
+ topic = utils.WrapNamespace(pc.option.Namespace, topic)
+ data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll})
+ pc.topic = topic
+ pc.subscriptionDataTable.Store(topic, data)
+ oldQueues := pc.allocateQueues
+ pc.allocateQueues = mqs
+ rlog.Info("pull consumer assign new mqs", map[string]interface{}{
+ "topic": topic,
+ "group": pc.GroupName,
+ "oldMqs": oldQueues,
+ "newMqs": mqs,
+ })
+ if pc.isRunning() {
+ pc.Rebalance()
+ }
+ return nil
+}
+
+func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue,
originOffset int64) int64 {
+ if pc.SubType != Assign {
+ return originOffset
+ }
+ value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
+ if !exist {
+ return originOffset
+ } else {
+ nextOffset := value.(int64)
+ _ = pc.updateOffset(mq, nextOffset)
+ return nextOffset
+ }
+}
+
func (pc *defaultPullConsumer) Start() error {
var err error
pc.once.Do(func() {
@@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string {
}
func (pc *defaultPullConsumer) Rebalance() {
- pc.defaultConsumer.doBalance()
+ switch pc.SubType {
+ case Assign:
+ pc.RebalanceViaTopic()
+ break
+ case Subscribe:
+ pc.defaultConsumer.doBalance()
+ break
+ }
}
func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
- pc.defaultConsumer.doBalanceIfNotPaused()
+ switch pc.SubType {
+ case Assign:
+ pc.RebalanceViaTopic()
+ break
+ case Subscribe:
+ pc.defaultConsumer.doBalanceIfNotPaused()
+ break
+ }
+}
+
+func (pc *defaultPullConsumer) RebalanceViaTopic() {
+ changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic,
pc.allocateQueues)
+ if changed {
+ rlog.Info("PullConsumer rebalance result changed ",
map[string]interface{}{
+ rlog.LogKeyAllocateMessageQueue: pc.allocateQueues,
+ })
+ }
}
func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool)
*internal.ConsumerRunningInfo {
@@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string,
table map[primitive.Mes
}
+func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset
int64) {
+ pc.mq2seekOffset.Store(mq, offset)
+ rlog.Info("pull consumer seek offset", map[string]interface{}{
+ "mq": mq,
+ "offset": offset,
+ })
+}
+
+func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue,
timestamp int64) (int64, error) {
+ return pc.searchOffsetByTimestamp(mq, timestamp)
+}
+
func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll,
mqDivided []*primitive.MessageQueue) {
+ if pc.SubType == Assign {
+ return
+ }
+
var allocateQueues []*primitive.MessageQueue
pc.defaultConsumer.processQueueTable.Range(func(key, value interface{})
bool {
mq := key.(primitive.MessageQueue)
@@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
+
+ nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
beginTime := time.Now()
sd := v.(*internal.SubscriptionData)
@@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
- QueueOffset: request.nextOffset,
+ QueueOffset: nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
@@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error {
return fmt.Errorf("consumerGroup can't equal [%s], please
specify another one", internal.DefaultConsumerGroup)
}
+ if pc.SubType == None {
+ return errors2.ErrBlankSubType
+ }
+
return nil
}
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index 930f0a3..7f29dd3 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) {
stats := NewStatsManager()
st := time.Now()
- for {
+ for {
stats.increasePullTPS("rocketmq", "default", 1)
- time.Sleep(500*time.Millisecond)
+ time.Sleep(500 * time.Millisecond)
if time.Now().Sub(st) > 5*time.Minute {
break
}
diff --git a/errors/errors.go b/errors/errors.go
index 2899506..03d0fb5 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -34,11 +34,14 @@ var (
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
ErrStartTopic = errors.New("cannot subscribe topic since client
either failed to start or has been shutdown.")
+ ErrSubscriptionType = errors.New("subscribe type is not matched")
+ ErrBlankSubType = errors.New("subscribe type should not be blank")
ErrResponse = errors.New("response error")
ErrCompressLevel = errors.New("unsupported compress level")
ErrUnknownIP = errors.New("unknown IP address")
ErrService = errors.New("service close is not running, please
check")
ErrTopicNotExist = errors.New("topic not exist")
+ ErrRouteNotFound = errors.New("topic route not found")
ErrNotExisted = errors.New("not existed")
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
ErrMultiIP = errors.New("multiple IP addr does not support")
diff --git a/examples/consumer/pull/poll_assign/main.go
b/examples/consumer/pull/poll_assign/main.go
new file mode 100644
index 0000000..838400a
--- /dev/null
+++ b/examples/consumer/pull/poll_assign/main.go
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "log"
+ _ "net/http/pprof"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+ nameSrvAddr = "http://127.0.0.1:9876"
+ accessKey = "rocketmq"
+ secretKey = "12345678"
+ topic = "test-topic"
+ consumerGroupName = "testPullGroup"
+ tag = "testPull"
+ namespace = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+func main() {
+ rlog.SetLogLevel("info")
+ var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+ if err != nil {
+ log.Fatalf("NewNamesrvAddr err: %v", err)
+ }
+ pullConsumer, err = rocketmq.NewPullConsumer(
+ consumer.WithGroupName(consumerGroupName),
+ consumer.WithNameServer(nameSrv),
+ consumer.WithNamespace(namespace),
+ consumer.WithMaxReconsumeTimes(2),
+ )
+ if err != nil {
+ log.Fatalf("fail to new pullConsumer: %v", err)
+ }
+
+ // assign nil firstly to help consumer start up
+ err = pullConsumer.Assign(topic, nil)
+ if err != nil {
+ log.Fatalf("fail to Assign: %v", err)
+ }
+ err = pullConsumer.Start()
+ if err != nil {
+ log.Fatalf("fail to Start: %v", err)
+ }
+
+ mqs, err := pullConsumer.GetTopicRouteInfo(topic)
+ if err != nil {
+ log.Fatalf("fail to GetTopicRouteInfo: %v", err)
+ }
+
+ for _, mq := range mqs {
+ offset, err := pullConsumer.OffsetForTimestamp(mq,
time.Now().UnixMilli()-60*10)
+ if err != nil {
+ log.Fatalf("fail to get offset for timestamp: %v", err)
+ } else {
+ pullConsumer.SeekOffset(mq, offset)
+ }
+ }
+
+ err = pullConsumer.Assign(topic, mqs)
+ if err != nil {
+ log.Fatalf("fail to Assign: %v", err)
+ }
+
+ for {
+ poll()
+ }
+}
+
+func poll() {
+ cr, err := pullConsumer.Poll(context.TODO(), time.Second*5)
+ if consumer.IsNoNewMsgError(err) {
+ log.Println("no new msg")
+ return
+ }
+ if err != nil {
+ log.Printf("[poll error] err=%v", err)
+ time.Sleep(sleepTime)
+ return
+ }
+
+ // todo LOGIC CODE HERE
+ log.Println("msgList: ", cr.GetMsgList())
+ log.Println("messageQueue: ", cr.GetMQ())
+ log.Println("processQueue: ", cr.GetPQ())
+ // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
+ pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
+}
diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go
index 248c837..fab61ab 100644
--- a/examples/consumer/tls/main.go
+++ b/examples/consumer/tls/main.go
@@ -1,59 +1,59 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
- "context"
- "fmt"
- "os"
- "time"
-
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/primitive"
-)
-
-func main() {
- c, _ := rocketmq.NewPushConsumer(
- consumer.WithGroupName("testGroup"),
-
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
- consumer.WithTls(true),
- )
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
context.Context,
- msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
- for i := range msgs {
- fmt.Printf("subscribe callback: %v \n", msgs[i])
- }
-
- return consumer.ConsumeSuccess, nil
- })
- if err != nil {
- fmt.Println(err.Error())
- }
- // Note: start after subscribe
- err = c.Start()
- if err != nil {
- fmt.Println(err.Error())
- os.Exit(-1)
- }
- time.Sleep(time.Hour)
- err = c.Shutdown()
- if err != nil {
- fmt.Printf("shutdown Consumer error: %s", err.Error())
- }
-}
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithTls(true),
+ )
+ err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ for i := range msgs {
+ fmt.Printf("subscribe callback: %v \n", msgs[i])
+ }
+
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+ err = c.Shutdown()
+ if err != nil {
+ fmt.Printf("shutdown Consumer error: %s", err.Error())
+ }
+}
diff --git a/examples/producer/rpc/async/main.go
b/examples/producer/rpc/async/main.go
index c6676d3..c0ce57b 100644
--- a/examples/producer/rpc/async/main.go
+++ b/examples/producer/rpc/async/main.go
@@ -15,7 +15,6 @@ See the License for the specific language governing
permissions and
limitations under the License.
*/
-
package main
import (
diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go
index c926c05..ddaf165 100644
--- a/examples/producer/tls/main.go
+++ b/examples/producer/tls/main.go
@@ -1,62 +1,62 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
- "context"
- "fmt"
- "os"
- "strconv"
-
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/producer"
-)
-
-// Package main implements a simple producer to send message.
-func main() {
- p, _ := rocketmq.NewProducer(
-
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
- producer.WithRetry(2),
- producer.WithTls(true),
- )
- err := p.Start()
- if err != nil {
- fmt.Printf("start producer error: %s", err.Error())
- os.Exit(1)
- }
- topic := "test"
-
- for i := 0; i < 10; i++ {
- msg := &primitive.Message{
- Topic: topic,
- Body: []byte("Hello RocketMQ Go Client! " +
strconv.Itoa(i)),
- }
- res, err := p.SendSync(context.Background(), msg)
-
- if err != nil {
- fmt.Printf("send message error: %s\n", err)
- } else {
- fmt.Printf("send message success: result=%s\n",
res.String())
- }
- }
- err = p.Shutdown()
- if err != nil {
- fmt.Printf("shutdown producer error: %s", err.Error())
- }
-}
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strconv"
+
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+// Package main implements a simple producer to send message.
+func main() {
+ p, _ := rocketmq.NewProducer(
+
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithRetry(2),
+ producer.WithTls(true),
+ )
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ topic := "test"
+
+ for i := 0; i < 10; i++ {
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("Hello RocketMQ Go Client! " +
strconv.Itoa(i)),
+ }
+ res, err := p.SendSync(context.Background(), msg)
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
res.String())
+ }
+ }
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shutdown producer error: %s", err.Error())
+ }
+}