Jimmy01010 commented on issue #1033:
URL: 
https://github.com/apache/rocketmq-client-go/issues/1033#issuecomment-1577797514

   > > > 看错误是消费端挂了,我怀疑你消费端调用Shutdown了,但是程序没有关闭,能否把你消费端的代码贴出来看一下
   > > 
   > > 
   > > ```
   > > func ConfigConsumer() {
   > >  c, err := rocketmq.NewPushConsumer(
   > >          consumer.WithConsumerModel(consumer.Clustering),
   > >          
consumer.WithNameServer([]string{global.ServerConfig.RocketMQ.Address}),
   > >          
consumer.WithGroupName("config_consumer_"+global.ServerConfig.Name),
   > >          // consumer.WithGroupName("config_consumer_test1"),
   > >          consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
   > >          consumer.WithConsumerOrder(true),
   > >          consumer.WithConsumerPullTimeout(time.Second*60),
   > >  )
   > >  if err != nil {
   > >          panic("ConfigConsumer NewPushConsumer err:" + err.Error())
   > >  }
   > >  if err := c.Subscribe(global.ServerConfig.RocketMQ.Topic,
   > >          consumer.MessageSelector{},
   > >          func(ctx context.Context, msgs ...*primitive.MessageExt) 
(consumer.ConsumeResult, error) {
   > >                  for i := range msgs {
   > >                          err := DealMessage(msgs[i].Body)
   > >                          if err != nil {
   > >                                  zap.S().Errorf("处理消息错误 %v %v", msgs[i], 
err)
   > >                                  return consumer.ConsumeRetryLater, nil
   > >                          }
   > >                  }
   > >                  return consumer.ConsumeSuccess, nil
   > >          }); err != nil {
   > >          zap.S().Errorf("MQ:读取消息失败")
   > >  }
   > >  //开始消费消息
   > > 
   > >  err = c.Start()
   > >  if err != nil {
   > >          zap.S().Errorf("ConfigConsumer Start Consumer失败: %v", err)
   > >  }
   > >  quit := make(chan os.Signal)
   > >  signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) //nolint:govet
   > >  <-quit
   > >  err = c.Shutdown() //nolint:errcheck
   > >  if err != nil {
   > >          zap.S().Errorf("ConfigConsumer Shutdown 失败: %v", err)
   > >  }
   > > } 
   > > ```
   > 
   > 在这里看是没问题的,你再确认一下,在运行的这个进程里(这个项目),在其他地方有没有创建的有消费者,然后调用了shutdown方法的?
   
   > > > 看错误是消费端挂了,我怀疑你消费端调用Shutdown了,但是程序没有关闭,能否把你消费端的代码贴出来看一下
   > > 
   > > 
   > > ```
   > > func ConfigConsumer() {
   > >  c, err := rocketmq.NewPushConsumer(
   > >          consumer.WithConsumerModel(consumer.Clustering),
   > >          
consumer.WithNameServer([]string{global.ServerConfig.RocketMQ.Address}),
   > >          
consumer.WithGroupName("config_consumer_"+global.ServerConfig.Name),
   > >          // consumer.WithGroupName("config_consumer_test1"),
   > >          consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
   > >          consumer.WithConsumerOrder(true),
   > >          consumer.WithConsumerPullTimeout(time.Second*60),
   > >  )
   > >  if err != nil {
   > >          panic("ConfigConsumer NewPushConsumer err:" + err.Error())
   > >  }
   > >  if err := c.Subscribe(global.ServerConfig.RocketMQ.Topic,
   > >          consumer.MessageSelector{},
   > >          func(ctx context.Context, msgs ...*primitive.MessageExt) 
(consumer.ConsumeResult, error) {
   > >                  for i := range msgs {
   > >                          err := DealMessage(msgs[i].Body)
   > >                          if err != nil {
   > >                                  zap.S().Errorf("处理消息错误 %v %v", msgs[i], 
err)
   > >                                  return consumer.ConsumeRetryLater, nil
   > >                          }
   > >                  }
   > >                  return consumer.ConsumeSuccess, nil
   > >          }); err != nil {
   > >          zap.S().Errorf("MQ:读取消息失败")
   > >  }
   > >  //开始消费消息
   > > 
   > >  err = c.Start()
   > >  if err != nil {
   > >          zap.S().Errorf("ConfigConsumer Start Consumer失败: %v", err)
   > >  }
   > >  quit := make(chan os.Signal)
   > >  signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) //nolint:govet
   > >  <-quit
   > >  err = c.Shutdown() //nolint:errcheck
   > >  if err != nil {
   > >          zap.S().Errorf("ConfigConsumer Shutdown 失败: %v", err)
   > >  }
   > > } 
   > > ```
   > 
   > 在这里看是没问题的,你再确认一下,在运行的这个进程里(这个项目),在其他地方有没有创建的有消费者,然后调用了shutdown方法的?
   
   
确实在三个协程中分别创建了一个消费者(不同消费组),协程中都有在SIGTERM信号后调用Shutdown方法,但每个协程中的消费者都是调用NewPushConsumer新建的消费者实例,这样会导致相关问题吗?谢谢大佬
   ```
        // 订阅配置变化消息
        go rocketmq.ConfigConsumer()
        // 订阅数据计算消息
        go rocketmq.DataStreamConsumer()
        // 订阅定时任务
        timerConsumer := rocketmq.TimerConsumer()
        zap.S().Info("服务启动成功")
        // 接收终止信号
        quit := make(chan os.Signal)
        signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
        <-quit
        zap.S().Info("Shutdown Server ...")
        err = timerConsumer.Shutdown()
        if err != nil {
                zap.S().Errorf("timerConsumer Shutdown 失败: %v", err)
        }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to