pandaapo opened a new issue, #4595: URL: https://github.com/apache/eventmesh/issues/4595
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no similar issues. ### Environment Windows ### EventMesh version master ### What happened Using RabbitMQ as the storage plugin for Eventmesh, it is not possible to successfully publish and subscribe via HTTP requests (refer to https://eventmesh.apache.org/docs/sdk-java/http#using-curl-command). 当使用RabbitMQ作为Eventmesh的存储插件时,通过HTTP请求的方式(参照这里https://eventmesh.apache.org/docs/sdk-java/http#using-curl-command )无法进行发布和订阅。 ### How to reproduce The content seems a bit long because I tried two situations and found that they were not exactly the same. I tried my best to describe each situation clearly. **The first scenario:** Execute the publish first, indicating successful publish, but there is no change on RabbitMQ. ``` // The request to publish the message http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC [header] Content-Type: application/json [body] {"name": "admin", "pass":"12345678"} // Received a successful response but no change in RabbitMQ {"retCode":0,"retMsg":"successSendResult[topic=TEST-TOPIC-HTTP-ASYNC, messageId=c12f0a16-b195-43cc-9d23-6308688853b6]"} ``` If a message is published again at this time, EventMesh will report an error as follows ``` 13:47:39.315 [eventMesh-sendMsg-2] ERROR org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer - [RabbitmqProducer] publish happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 13:47:39.456 [eventMesh-sendMsg-2] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor - message|eventMesh2mq|REQ|ASYNC|send2MQCost=146ms|topic=TEST-TOPIC-HTTP-ASYNC|bizSeqNo=32904550674459604496185146551151|uniqueId=38583629171915403501164020610522 org.apache.eventmesh.api.exception.StorageRuntimeException: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:120) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] ... 7 more ``` **The second scenario:** Start the HTTP service `org.apache.eventmesh.htp.demo.sub.SpringBootDemoApplication` required by the subscriber first. At this point, RabbitMQ will create an exchange 'eventmesh.default' of 'topic' type and create a queue called 'DefaultQueue' bound to that exchange. Then execute the message publishing request, and the specific request is the same as in the first case, receiving a successful publishing response, but RabbitMQ did not receive the data. Repeatedly publishing messages, you will receive a successful response, but RabbitMQ has not received any data. Then execute the message subscription request, and EventMesh will report an error. ``` // The request to publish the message http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC [header] Content-Type: application/json [body] {"name": "admin", "pass":"12345678"} // Request of subscribing http://127.0.0.1:10105/eventmesh/subscribe/local [header] Content-Type: application/json [body] { "url": "http://127.0.0.1:8088/sub/test", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "TEST-TOPIC-HTTP-ASYNC", "type": "ASYNC" } ] } // Error reported by EventMesh for request of subscribing 14:07:57.105 [eventMesh-clientManage-2] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - eventMesh protocol[body] error java.lang.RuntimeException: eventMesh protocol[body] error at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] ``` Then restart EvenMesh and execute the message publishing request. RabbitMQ will only receive the data and can publish it multiple times. But when executing the subscription request, EvenMesh still reported an error, error message as described above. --- 内容看起来好像有些长,因为我试了两种情况,发现不尽相同,我尽量将每种情况描述清楚。 **第一种情况:** 先执行消息发布请求,提示发布成功,但是RabbitMQ上没有变化。 ``` // 发布消息的请求 http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC [header] Content-Type: application/json [body] {"name": "admin", "pass":"12345678"} // 收到成功响应,但是RabbitMQ没有任何变化 {"retCode":0,"retMsg":"successSendResult[topic=TEST-TOPIC-HTTP-ASYNC, messageId=c12f0a16-b195-43cc-9d23-6308688853b6]"} ``` 此时如果再发布一次消息,EventMesh会报如下错误 ``` 13:47:39.315 [eventMesh-sendMsg-2] ERROR org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer - [RabbitmqProducer] publish happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 13:47:39.456 [eventMesh-sendMsg-2] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor - message|eventMesh2mq|REQ|ASYNC|send2MQCost=146ms|topic=TEST-TOPIC-HTTP-ASYNC|bizSeqNo=32904550674459604496185146551151|uniqueId=38583629171915403501164020610522 org.apache.eventmesh.api.exception.StorageRuntimeException: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:120) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', class-id=60, method-id=40) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108) ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] ... 7 more ``` **第二种情况:** 先启动订阅端需要的HTTP服务`org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication`,这时RabbitMQ会创建“topic”类型的交互机“eventmesh.default”,会创建绑定到该交互机的队列“DefaultQueue”。 然后执行消息发布请求,具体请求和第一种情况中一样,会收到发布成功的响应,但RabbitMQ没有收到数据。多次发布消息,都会收到发布成功的响应,但RabbitMQ都没有收到数据。 然后执行消息订阅请求,EventMesh会报错。 ``` // 发布消息的请求 http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC [header] Content-Type: application/json [body] {"name": "admin", "pass":"12345678"} // 订阅消息的请求 http://127.0.0.1:10105/eventmesh/subscribe/local [header] Content-Type: application/json [body] { "url": "http://127.0.0.1:8088/sub/test", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "TEST-TOPIC-HTTP-ASYNC", "type": "ASYNC" } ] } // EventMesh对订阅消息请求所报的错 14:07:57.105 [eventMesh-clientManage-2] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - eventMesh protocol[body] error java.lang.RuntimeException: eventMesh protocol[body] error at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97) [main/:?] at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] ``` 然后重启EvenMesh,再执行消息发布请求,这时RabbitMQ才会收到数据,并且可以多次发布。但是执行订阅请求,EvenMesh依旧报错,错误信息如上描述。 ### Debug logs _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [ ] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) * -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
