laozhuzz opened a new issue #705:
URL: https://github.com/apache/rocketmq-client-go/issues/705
version 2.1
比如
1个pushconsumer subscribe
1个producer发送消息
pushconsumer shutdown()之后, producer发送消息报错: service close is not running,
please check
`
2021/07/30 17:43:47 Ctrl+C to unsubscribe, another Ctrl+C to exit
2021/07/30 17:43:51 recv signal interrupt
2021/07/30 17:43:51 Unsubscribe succ
2021/07/30 17:43:51 exit. send stopmsg fail. err:service close is not
running, please check
`
希望 consumer 和 producer 相互不影响
代码如下:
`
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func StartSubscriber(name string, topic string) rocketmq.PushConsumer {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(topic),
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
)
if err != nil {
log.Printf("fail init rocketmq consumer. err: %v", err)
return nil
}
err = c.Subscribe(topic, consumer.MessageSelector{},
func(c context.Context, msgs ...*primitive.MessageExt)
(consumer.ConsumeResult, error) {
for _, msg := range msgs {
log.Printf("%v get msg topic:%v body:%v", name,
msg.Topic, string(msg.Body))
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
log.Printf("Subscribe fail %v", err)
return nil
}
err = c.Start()
if err != nil {
log.Printf("Subscribe start fail %v", err)
return nil
}
return c
}
func main() {
config := flag.Uint("i", 0, "index")
flag.Parse()
index := *config
topic := "testTopic"
c1 := StartSubscriber("C1", topic)
p, _ := rocketmq.NewProducer(
producer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
producer.WithGroupName(topic),
producer.WithRetry(5),
producer.WithSendMsgTimeout(3*time.Second),
)
if err := p.Start(); err != nil {
log.Printf("init rocketmq producer fail: %v", err)
return
}
msg := fmt.Sprintf("start%v", index)
_, err := p.SendSync(context.Background(), primitive.NewMessage(topic,
[]byte(msg)))
if err != nil {
log.Print(err)
} else {
//log.Print(r)
}
time.Sleep(1 * time.Second)
log.Printf("Ctrl+C to unsubscribe, another Ctrl+C to exit")
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
stopped := false
for {
s := <-c
log.Printf("recv signal %v", s)
switch s {
default:
if stopped {
return
}
stopped = true
if err := c1.Unsubscribe(topic); err != nil {
log.Printf("Unsubscribe fail %v", err)
} else {
log.Printf("Unsubscribe succ")
c1.Shutdown()
msg := fmt.Sprintf("stop%v", index)
_, err := p.SendSync(context.Background(),
primitive.NewMessage(topic, []byte(msg)))
if err != nil {
log.Fatalf("exit. send stopmsg fail.
err:%v", err)
} else {
//log.Print(r)
}
time.Sleep(1 * time.Second)
}
}
}
}
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]