majintao opened a new issue, #1129:
URL: https://github.com/apache/rocketmq-client-go/issues/1129
`package rocket_mq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/skywalking-go/toolkit/trace"
)
type ProcFunc func(context.Context, []byte) error
type ConsumerProcessor struct {
opts []consumer.Option
selector consumer.MessageSelector
topic string
procFunc ProcFunc
consumer rocketmq.PushConsumer
}
func NewConsumerProcessor(topic string, procFunc ProcFunc, opts
...consumer.Option) *ConsumerProcessor {
return &ConsumerProcessor{
opts: opts,
topic: topic,
procFunc: procFunc,
}
}
func (cp *ConsumerProcessor) Start() error {
// 启动消费
c, err := rocketmq.NewPushConsumer(cp.opts...)
if err != nil {
return err
}
cp.consumer = c
c.Subscribe(cp.topic, cp.selector, func(ctx context.Context, messages
...*primitive.MessageExt) (
consumer.ConsumeResult, error) {
// 处理消息
for _, message := range messages {
err := cp.process(ctx, message)
if err != nil {
fmt.Printf("messageId:%s", message.MsgId)
return consumer.ConsumeRetryLater, nil
}
}
return consumer.ConsumeSuccess, nil
})
err = c.Start()
if err != nil {
return err
}
return nil
}
func (cp *ConsumerProcessor) process(ctx context.Context, message
*primitive.MessageExt) error {
// 添加链路追踪支持
_, err := trace.CreateLocalSpan("")
if err != nil {
}
defer trace.StopSpan()
err = cp.procFunc(ctx, message.Body)
if err != nil {
return err
}
return nil
}
func (cp *ConsumerProcessor) Stop() {
// 关闭消费
cp.consumer.Shutdown()
}
`
`
topic := "test_topic"
rocket_mq.NewConsumerProcessor(topic, func(c context.Context, msg
[]byte) error {
xflog.Infof("msg:%v", string(msg))
return errors.New("111")
}, consumer.WithRetry(-1), consumer.WithAutoCommit(false),
consumer.WithGroupName("abc"),
consumer.WithNameServer([]string{"10.34.22.164:9876",
"10.34.22.167:9876"})).Start()
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]