wyhbert opened a new issue #489: rocketmq消费端启动问题 URL: https://github.com/apache/rocketmq/issues/489 错误信息: 12-Oct-2018 09:57:09.717 INFO [main] org.apache.catalina.startup.HostConfig.deployWAR Deploying web application archive [/data/tomcats/qm-wms-inventory-service/tomcat/webapps/ROOT.war] 12-Oct-2018 09:57:12.791 INFO [main] org.apache.jasper.servlet.TldScanner.scanJars At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time. 2018-10-12 09:57:16 INFO com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:32 - start consumer..... DefaultConsumer Started. 2018-10-12 09:57:18 INFO org.hibernate.validator.internal.util.Version <clinit>:17 - HV000001: Hibernate Validator 5.2.2.Final 12-Oct-2018 09:57:19.395 INFO [main] org.apache.catalina.startup.HostConfig.deployWAR Deployment of web application archive [/data/tomcats/qm-wms-inventory-service/tomcat/webapps/ROOT.war] has finished in [9,677] ms 12-Oct-2018 09:57:19.399 INFO [main] org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler ["http-nio-8081"] 12-Oct-2018 09:57:19.412 INFO [main] org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler ["ajp-nio-8010"] 12-Oct-2018 09:57:19.415 INFO [main] org.apache.catalina.startup.Catalina.start Server startup in 9752 ms 2018-10-12 09:57:21 INFO com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:32 - start consumer..... 2018-10-12 09:57:21 ERROR com.qmfresh.inventory.message.consumer.DefaultConsumer doStart:47 - defaultConsumer start error com.alibaba.rocketmq.client.exception.MQClientException: The consumer group[qm] has been created before, specify another name please. See https://github.com/alibaba/RocketMQ/issues/40 for further details. at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:714) at com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:365) at com.qmfresh.inventory.message.consumer.DefaultConsumer.doStart(DefaultConsumer.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1702) 1)start consumer.....启动了多次,第一次消费端注册成功,第二次开始注册时,会出现consumer group已经创建过的错误。 2)消费端初始化注册doStart方法配置: spring配置文件通过bean配置方式注入 <bean id="defaultConsumer" class="com.qmfresh.inventory.message.consumer.DefaultConsumer" init-method="doStart"> </bean> 类文件如下: public class DefaultConsumer { private static final Logger logger = LoggerFactory.getLogger(DefaultConsumer.class); private Map<String, MessageListener> listenMap; @Resource private RocketMqConfig config; public void doStart() { logger.info("start consumer....."); DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer(); defaultConsumer.setConsumerGroup("qm"); defaultConsumer.setMessageModel(MessageModel.BROADCASTING); defaultConsumer.setNamesrvAddr(config.getServerAddress()); // 设置订阅topic Map<String, String> subscription = new HashMap<>(16); subscription.put("supplier_receive_confirmed", "*"); subscription.put("supplier_return_confirmed", "*"); defaultConsumer.setSubscription(subscription); defaultConsumer.registerMessageListener(new DefaultMessageListenerConcurrently()); try { defaultConsumer.start(); } catch (Exception e) { logger.error("defaultConsumer start error", e); } System.out.println("DefaultConsumer Started."); } private class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); Map<String, List<MessageExt>> topicMap = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic)); if (listenMap == null) { listenMap = SpringBeanUtil.getApplicationContext().getBeansOfType(MessageListener.class); } for (Map.Entry<String, MessageListener> entry : listenMap.entrySet()) { MessageListener consumer = SpringBeanUtil.getBean(entry.getKey()); List<MessageExt> subMsgs = topicMap.get(consumer.getTopic()); if (subMsgs != null && !subMsgs.isEmpty()) { try { consumer.handleMessage(subMsgs); } catch (Exception e) { logger.error("消息处理异常:" + e.getMessage(), e); } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } } 注:在本地环境和测试环境可以正常启动,消费者能正常启动,但在生产环境启动时,会出现多次注册的情况,请帮忙提供下解决问题的思路
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
