roneywei commented on issue #166: 延迟队列 URL: https://github.com/apache/rocketmq-spring/issues/166#issuecomment-551373601 我利用RocketMQTemplate.java发送延迟消息 demo如下 @Test public void syncDelayQueue() { String messageContent = "syncDelayQueue"; Map h = new ConcurrentHashMap(); h.put("message", messageContent); h.put("defaultQueue", syncDelayQueue); Message<?> message = MessageBuilder .withPayload(JSON.toJSON(h)) .setHeader("contentType", "application/json") .build(); producer.syncDelayQueue(message, MessageDelayLevel.ONE_S); } public void syncDelayQueue(Message<?> message, MessageDelayLevel messageDelayLevel) { log.info("send syncDelayQueue message :{};rocketMqDelayLevel:{}", message, messageDelayLevel); this.rocketMqUtils.syncDelayQueue(syncDelayQueue, message, messageDelayLevel); } public SendResult syncDelayQueue(String queueName, Message<?> message, MessageDelayLevel messageDelayLevel) { log.info("syncDelayQueue queueName:{},message:{},delayLevel:{}", queueName, message, messageDelayLevel); SendResult result = rocketMQTemplate.syncSend(queueName, message, rocketMQTemplate.getProducer().getSendMsgTimeout(), messageDelayLevel.getValue()); log.info("syncDelayQueue result:{}",result); return result; } 我发送的是org.springframework.messaging.Message 请问一下,我怎样直接接收org.springframework.messaging.Message发送的内容 @Component @RocketMQMessageListener(topic = "${mq.demo.sync-delay-queue:syncDelayQueueDemo}",consumerGroup = "demo") class SyncDelayQueueDemo implements RocketMQListener<Message> { @Override public void onMessage( Message message) { log.info("RocketMqConsumer1 syncDelayQueueDemo message :{} " ,message); } } 错误信息如下: java.lang.RuntimeException: cannot convert message to interface org.springframework.messaging.Message at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:382) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:57) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:330) at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
