xingzhefeng commented on issue #503: 消息已经消费成功,而且消费了1 次,但是在rocketmq-console中 NOT_CONSUME_YET URL: https://github.com/apache/rocketmq/issues/503#issuecomment-432969449 问题出在 域名和ip 的判断上 修改 这个类DefaultMQAdminExtImpl 的 public boolean consumed(final MessageExt msg, final String group) 方法 `public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { log.info("开始检测消息是否已消费",group,msg); ConsumeStats cstats = this.examineConsumeStats(group); ClusterInfo ci = this.examineBrokerClusterInfo(); Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator(); log.info("it",it); while (it.hasNext()) { Entry<MessageQueue, OffsetWrapper> next = it.next(); MessageQueue mq = next.getKey(); if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) { BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName()); log.info("服务器上的数据:",brokerData); if (brokerData != null) { String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); log.info("dizhi:",addr,RemotingUtil.socketAddress2String(msg.getStoreHost())); log.info("haode:",next.getValue().getConsumerOffset() , msg.getQueueOffset()); if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) { System.out.println("一个是域名一个是ip地址实际上是一样的.也返回true1"); return true; } // if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) { // } } }else{ log.info("mq条件判断失败",mq); } } return false; }`
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
