wyhbert opened a new issue #489: rocketmq消费端启动问题
URL: https://github.com/apache/rocketmq/issues/489
 
 
   错误信息:
   12-Oct-2018 09:57:09.717 INFO [main] 
org.apache.catalina.startup.HostConfig.deployWAR Deploying web application 
archive [/data/tomcats/qm-wms-inventory-service/tomcat/webapps/ROOT.war]
   12-Oct-2018 09:57:12.791 INFO [main] 
org.apache.jasper.servlet.TldScanner.scanJars At least one JAR was scanned for 
TLDs yet contained no TLDs. Enable debug logging for this logger for a complete 
list of JARs that were scanned but no TLDs were found in them. Skipping 
unneeded JARs during scanning can improve startup time and JSP compilation time.
   2018-10-12 09:57:16 INFO  
com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:32 - start 
consumer.....
   DefaultConsumer Started.
   2018-10-12 09:57:18 INFO  org.hibernate.validator.internal.util.Version 
<clinit>:17 - HV000001: Hibernate Validator 5.2.2.Final
   12-Oct-2018 09:57:19.395 INFO [main] 
org.apache.catalina.startup.HostConfig.deployWAR Deployment of web application 
archive [/data/tomcats/qm-wms-inventory-service/tomcat/webapps/ROOT.war] has 
finished in [9,677] ms
   12-Oct-2018 09:57:19.399 INFO [main] 
org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler 
["http-nio-8081"]
   12-Oct-2018 09:57:19.412 INFO [main] 
org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler 
["ajp-nio-8010"]
   12-Oct-2018 09:57:19.415 INFO [main] 
org.apache.catalina.startup.Catalina.start Server startup in 9752 ms
   2018-10-12 09:57:21 INFO  
com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:32 - start 
consumer.....
   2018-10-12 09:57:21 ERROR 
com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:47 - 
defaultConsumer start error
   com.alibaba.rocketmq.client.exception.MQClientException: The consumer 
group[qm] has been created before, specify another name please.
   See https://github.com/alibaba/RocketMQ/issues/40 for further details.
           at 
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:714)
           at 
com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:365)
           at 
com.qmfresh.inventory.message.consumer.DefaultConsumer.doStart(DefaultConsumer.java:45)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1702)
   
   1)start consumer.....启动了多次,第一次消费端注册成功,第二次开始注册时,会出现consumer group已经创建过的错误。
   2)消费端初始化注册doStart方法配置:
   spring配置文件通过bean配置方式注入
   <bean id="defaultConsumer" 
class="com.qmfresh.inventory.message.consumer.DefaultConsumer"
                  init-method="doStart">
        </bean>
   
   类文件如下:
   public class DefaultConsumer {
       private static final Logger logger = 
LoggerFactory.getLogger(DefaultConsumer.class);
       private Map<String, MessageListener> listenMap;
   
       @Resource
       private RocketMqConfig config;
   
       public void doStart() {
           logger.info("start consumer.....");
           DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer();
           defaultConsumer.setConsumerGroup("qm");
           defaultConsumer.setMessageModel(MessageModel.BROADCASTING);
           defaultConsumer.setNamesrvAddr(config.getServerAddress());
           // 设置订阅topic
           Map<String, String> subscription = new HashMap<>(16);
           subscription.put("supplier_receive_confirmed", "*");
           subscription.put("supplier_return_confirmed", "*");
           defaultConsumer.setSubscription(subscription);
   
           defaultConsumer.registerMessageListener(new 
DefaultMessageListenerConcurrently());
           try {
               defaultConsumer.start();
           } catch (Exception e) {
               logger.error("defaultConsumer start error", e);
           }
   
           System.out.println("DefaultConsumer Started.");
       }
   
       private class DefaultMessageListenerConcurrently implements 
MessageListenerConcurrently {
   
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
               System.out.println(Thread.currentThread().getName() + " Receive 
New Messages: " + msgs);
               Map<String, List<MessageExt>> topicMap = 
msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
               if (listenMap == null) {
                   listenMap = 
SpringBeanUtil.getApplicationContext().getBeansOfType(MessageListener.class);
               }
               for (Map.Entry<String, MessageListener> entry : 
listenMap.entrySet()) {
                   MessageListener consumer = 
SpringBeanUtil.getBean(entry.getKey());
                   List<MessageExt> subMsgs = topicMap.get(consumer.getTopic());
                   if (subMsgs != null && !subMsgs.isEmpty()) {
                       try {
                           consumer.handleMessage(subMsgs);
                       } catch (Exception e) {
                           logger.error("消息处理异常:" + e.getMessage(), e);
                       }
                   }
               }
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       }
   }
   
   注:在本地环境和测试环境可以正常启动,消费者能正常启动,但在生产环境启动时,会出现多次注册的情况,请帮忙提供下解决问题的思路

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to