pandaapo opened a new issue, #4595:
URL: https://github.com/apache/eventmesh/issues/4595

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Environment
   
   Windows
   
   ### EventMesh version
   
   master
   
   ### What happened
   
   Using RabbitMQ as the storage plugin for Eventmesh, it is not possible to 
successfully publish and subscribe via HTTP requests (refer to 
https://eventmesh.apache.org/docs/sdk-java/http#using-curl-command).
   
   
当使用RabbitMQ作为Eventmesh的存储插件时,通过HTTP请求的方式(参照这里https://eventmesh.apache.org/docs/sdk-java/http#using-curl-command
 )无法进行发布和订阅。
   
   ### How to reproduce
   
   The content seems a bit long because I tried two situations and found that 
they were not exactly the same. I tried my best to describe each situation 
clearly.
   
   **The first scenario:**
   Execute the publish first, indicating successful publish, but there is no 
change on RabbitMQ.
   ```
   // The request to publish the message
   http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC
   [header] Content-Type: application/json
   [body] {"name": "admin", "pass":"12345678"}
   
   // Received a successful response but no change in RabbitMQ
   {"retCode":0,"retMsg":"successSendResult[topic=TEST-TOPIC-HTTP-ASYNC, 
messageId=c12f0a16-b195-43cc-9d23-6308688853b6]"}
   ```
   If a message is published again at this time, EventMesh will report an error 
as follows
   ```
   13:47:39.315 [eventMesh-sendMsg-2] ERROR 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer - 
[RabbitmqProducer] publish happen exception.
   com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
channel error; protocol method: #method<channel.close>(reply-code=404, 
reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', 
class-id=60, method-id=40)
        at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) 
~[amqp-client-5.16.0.jar:5.16.0]
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
 ~[amqp-client-5.16.0.jar:5.16.0]
        at 
org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108)
 [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   13:47:39.456 [eventMesh-sendMsg-2] ERROR 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor
 - 
message|eventMesh2mq|REQ|ASYNC|send2MQCost=146ms|topic=TEST-TOPIC-HTTP-ASYNC|bizSeqNo=32904550674459604496185146551151|uniqueId=38583629171915403501164020610522
   org.apache.eventmesh.api.exception.StorageRuntimeException: 
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
channel error; protocol method: #method<channel.close>(reply-code=404, 
reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', 
class-id=60, method-id=40)
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:120)
 [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already 
closed due to channel error; protocol method: 
#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 
'eventmesh.default' in vhost '/', class-id=60, method-id=40)
        at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) 
~[amqp-client-5.16.0.jar:5.16.0]
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
 ~[amqp-client-5.16.0.jar:5.16.0]
        at 
org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        ... 7 more
   ```
   
   **The second scenario:**
   Start the HTTP service 
`org.apache.eventmesh.htp.demo.sub.SpringBootDemoApplication` required by the 
subscriber first. At this point, RabbitMQ will create an exchange 
'eventmesh.default' of 'topic' type and create a queue called 'DefaultQueue' 
bound to that exchange.
   Then execute the message publishing request, and the specific request is the 
same as in the first case, receiving a successful publishing response, but 
RabbitMQ did not receive the data. Repeatedly publishing messages, you will 
receive a successful response, but RabbitMQ has not received any data.
   Then execute the message subscription request, and EventMesh will report an 
error.
   ```
   // The request to publish the message
   http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC
   [header] Content-Type: application/json
   [body] {"name": "admin", "pass":"12345678"}
   
   // Request of subscribing
   http://127.0.0.1:10105/eventmesh/subscribe/local
   [header] Content-Type: application/json
   [body] {
       "url": "http://127.0.0.1:8088/sub/test";,
       "consumerGroup": "TEST-GROUP",
       "topic": [
           {
               "mode": "CLUSTERING",
               "topic": "TEST-TOPIC-HTTP-ASYNC",
               "type": "ASYNC"
           }
       ]
   }
   
   // Error reported by EventMesh for request of subscribing
   14:07:57.105 [eventMesh-clientManage-2] ERROR 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - 
eventMesh protocol[body] error
   java.lang.RuntimeException: eventMesh protocol[body] error
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   ```
   Then restart EvenMesh and execute the message publishing request. RabbitMQ 
will only receive the data and can publish it multiple times. But when 
executing the subscription request, EvenMesh still reported an error, error 
message as described above.
   
   ---
   内容看起来好像有些长,因为我试了两种情况,发现不尽相同,我尽量将每种情况描述清楚。
   
   **第一种情况:**
   先执行消息发布请求,提示发布成功,但是RabbitMQ上没有变化。
   ```
   // 发布消息的请求
   http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC
   [header] Content-Type: application/json
   [body] {"name": "admin", "pass":"12345678"}
   
   // 收到成功响应,但是RabbitMQ没有任何变化
   {"retCode":0,"retMsg":"successSendResult[topic=TEST-TOPIC-HTTP-ASYNC, 
messageId=c12f0a16-b195-43cc-9d23-6308688853b6]"}
   ```
   此时如果再发布一次消息,EventMesh会报如下错误
   ```
   13:47:39.315 [eventMesh-sendMsg-2] ERROR 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer - 
[RabbitmqProducer] publish happen exception.
   com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
channel error; protocol method: #method<channel.close>(reply-code=404, 
reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', 
class-id=60, method-id=40)
        at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) 
~[amqp-client-5.16.0.jar:5.16.0]
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
 ~[amqp-client-5.16.0.jar:5.16.0]
        at 
org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108)
 [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   13:47:39.456 [eventMesh-sendMsg-2] ERROR 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor
 - 
message|eventMesh2mq|REQ|ASYNC|send2MQCost=146ms|topic=TEST-TOPIC-HTTP-ASYNC|bizSeqNo=32904550674459604496185146551151|uniqueId=38583629171915403501164020610522
   org.apache.eventmesh.api.exception.StorageRuntimeException: 
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
channel error; protocol method: #method<channel.close>(reply-code=404, 
reply-text=NOT_FOUND - no exchange 'eventmesh.default' in vhost '/', 
class-id=60, method-id=40)
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:120)
 [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper.send(MQProducerWrapper.java:79)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.send(EventMeshProducer.java:61)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncEventProcessor.handler(SendAsyncEventProcessor.java:248)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already 
closed due to channel error; protocol method: 
#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 
'eventmesh.default' in vhost '/', class-id=60, method-id=40)
        at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685) 
~[amqp-client-5.16.0.jar:5.16.0]
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675) 
~[amqp-client-5.16.0.jar:5.16.0]
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
 ~[amqp-client-5.16.0.jar:5.16.0]
        at 
org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient.publish(RabbitmqClient.java:75)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at 
org.apache.eventmesh.storage.rabbitmq.producer.RabbitmqProducer.publish(RabbitmqProducer.java:108)
 ~[eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        ... 7 more
   ```
   
   **第二种情况:**
   
先启动订阅端需要的HTTP服务`org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication`,这时RabbitMQ会创建“topic”类型的交互机“eventmesh.default”,会创建绑定到该交互机的队列“DefaultQueue”。
   
然后执行消息发布请求,具体请求和第一种情况中一样,会收到发布成功的响应,但RabbitMQ没有收到数据。多次发布消息,都会收到发布成功的响应,但RabbitMQ都没有收到数据。
   然后执行消息订阅请求,EventMesh会报错。
   ```
   // 发布消息的请求
   http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC
   [header] Content-Type: application/json
   [body] {"name": "admin", "pass":"12345678"}
   
   // 订阅消息的请求
   http://127.0.0.1:10105/eventmesh/subscribe/local
   [header] Content-Type: application/json
   [body] {
       "url": "http://127.0.0.1:8088/sub/test";,
       "consumerGroup": "TEST-GROUP",
       "topic": [
           {
               "mode": "CLUSTERING",
               "topic": "TEST-TOPIC-HTTP-ASYNC",
               "type": "ASYNC"
           }
       ]
   }
   
   // EventMesh对订阅消息请求所报的错
   14:07:57.105 [eventMesh-clientManage-2] ERROR 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - 
eventMesh protocol[body] error
   java.lang.RuntimeException: eventMesh protocol[body] error
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97)
 [main/:?]
        at 
org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280)
 [main/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
   ```
   
然后重启EvenMesh,再执行消息发布请求,这时RabbitMQ才会收到数据,并且可以多次发布。但是执行订阅请求,EvenMesh依旧报错,错误信息如上描述。
   
   ### Debug logs
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [ ] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) *


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to