laozhuzz opened a new issue #705:
URL: https://github.com/apache/rocketmq-client-go/issues/705


   version 2.1
   
   比如
   1个pushconsumer subscribe
   1个producer发送消息
   
   pushconsumer shutdown()之后, producer发送消息报错: service close is not running, 
please check
   `
   2021/07/30 17:43:47 Ctrl+C to unsubscribe, another Ctrl+C to exit
   2021/07/30 17:43:51 recv signal interrupt
   2021/07/30 17:43:51 Unsubscribe succ
   2021/07/30 17:43:51 exit. send stopmsg fail. err:service close is not 
running, please check
   `
   
   希望 consumer 和 producer 相互不影响
   代码如下:
   `
   package main
   
   import (
        "context"
        "flag"
        "fmt"
        "log"
        "os"
        "os/signal"
        "time"
   
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/consumer"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "github.com/apache/rocketmq-client-go/v2/producer"
   )
   
   func StartSubscriber(name string, topic string) rocketmq.PushConsumer {
        c, err := rocketmq.NewPushConsumer(
                consumer.WithGroupName(topic),
                
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
        )
        if err != nil {
                log.Printf("fail init rocketmq consumer. err: %v", err)
                return nil
        }
        err = c.Subscribe(topic, consumer.MessageSelector{},
                func(c context.Context, msgs ...*primitive.MessageExt) 
(consumer.ConsumeResult, error) {
                        for _, msg := range msgs {
                                log.Printf("%v get msg topic:%v body:%v", name, 
msg.Topic, string(msg.Body))
                        }
                        return consumer.ConsumeSuccess, nil
                })
        if err != nil {
                log.Printf("Subscribe fail %v", err)
                return nil
        }
        err = c.Start()
        if err != nil {
                log.Printf("Subscribe start fail %v", err)
                return nil
        }
        return c
   }
   func main() {
        config := flag.Uint("i", 0, "index")
        flag.Parse()
   
        index := *config
        topic := "testTopic"
   
        c1 := StartSubscriber("C1", topic)
   
        p, _ := rocketmq.NewProducer(
                
producer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
                producer.WithGroupName(topic),
                producer.WithRetry(5),
                producer.WithSendMsgTimeout(3*time.Second),
        )
        if err := p.Start(); err != nil {
                log.Printf("init rocketmq producer fail: %v", err)
                return
        }
        msg := fmt.Sprintf("start%v", index)
        _, err := p.SendSync(context.Background(), primitive.NewMessage(topic, 
[]byte(msg)))
        if err != nil {
                log.Print(err)
        } else {
                //log.Print(r)
        }
        time.Sleep(1 * time.Second)
        log.Printf("Ctrl+C to unsubscribe, another Ctrl+C to exit")
   
        c := make(chan os.Signal)
        signal.Notify(c, os.Interrupt)
        stopped := false
        for {
                s := <-c
                log.Printf("recv signal %v", s)
                switch s {
                default:
                        if stopped {
                                return
                        }
                        stopped = true
                        if err := c1.Unsubscribe(topic); err != nil {
                                log.Printf("Unsubscribe fail %v", err)
                        } else {
                                log.Printf("Unsubscribe succ")
                                c1.Shutdown()
   
                                msg := fmt.Sprintf("stop%v", index)
                                _, err := p.SendSync(context.Background(), 
primitive.NewMessage(topic, []byte(msg)))
                                if err != nil {
                                        log.Fatalf("exit. send stopmsg fail. 
err:%v", err)
                                } else {
                                        //log.Print(r)
                                }
                                time.Sleep(1 * time.Second)
   
                        }
                }
        }
   
   }
   
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to