[
https://issues.apache.org/jira/browse/FLINK-24806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440140#comment-17440140
]
Michał Ciesielczyk commented on FLINK-24806:
--------------------------------------------
Hi [~gaoyunhaii] , the issue reported here is caused by the RabbitMQ service
not started before trying to connect to it (so I'm not sure if this can be
related to timeout issues).
In the current code, we wait for the RMQ socket to be opened during the
container startup (here:
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java#L94]
). It seems, it is possible that the port is opened, but the RabbitMQ service
has not started yet.
To resolve it I would recommend to just remove this line.. If removed, it will
use the default RabbitMQContainer wait strategy, which is to wait for a
specific log message ("Server startup complete" to be specific).
What do you think? Can I prepare a PR for this?
> RMQSourceITCase.testStopWithSavepoint and testAckFailure failed on azure due
> to connection error
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-24806
> URL: https://issues.apache.org/jira/browse/FLINK-24806
> Project: Flink
> Issue Type: Bug
> Components: Connectors/ RabbitMQ
> Affects Versions: 1.15.0
> Reporter: Yun Gao
> Priority: Major
>
> {code:java}
> Nov 05 22:12:57 [INFO] Running
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
> Nov 05 22:13:18 [ERROR] Tests run: 3, Failures: 0, Errors: 2, Skipped: 0,
> Time elapsed: 20.921 s <<< FAILURE! - in
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
> Nov 05 22:13:18 [ERROR] testStopWithSavepoint Time elapsed: 3.39 s <<<
> ERROR!
> Nov 05 22:13:18 java.io.IOException
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:396)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207)
> Nov 05 22:13:18 at
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.getRMQConnection(RMQSourceITCase.java:203)
> Nov 05 22:13:18 at
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.setUp(RMQSourceITCase.java:98)
> Nov 05 22:13:18 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> Nov 05 22:13:18 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 05 22:13:18 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 05 22:13:18 at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 05 22:13:18 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 05 22:13:18 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 05 22:13:18
> Nov 05 22:13:18 [ERROR] testAckFailure Time elapsed: 1.842 s <<< ERROR!
> Nov 05 22:13:18 java.io.IOException
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:396)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
> Nov 05 22:13:18 at
> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045)
> Nov 05 22:13:18 at
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207)
> Nov 05 22:13:18 at
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.getRMQConnection(RMQSourceITCase.java:203)
> Nov 05 22:13:18 at
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.setUp(RMQSourceITCase.java:98)
> Nov 05 22:13:18 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> Nov 05 22:13:18 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 05 22:13:18 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26065&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=13478]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)