[ 
https://issues.apache.org/jira/browse/FLINK-24806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440140#comment-17440140
 ] 

Michał Ciesielczyk commented on FLINK-24806:
--------------------------------------------

Hi [~gaoyunhaii] , the issue reported here is caused by the RabbitMQ service 
not started before trying to connect to it (so I'm not sure if this can be 
related to timeout issues).

In the current code, we wait for the RMQ socket to be opened during the 
container startup (here: 
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java#L94]
 ). It seems, it is possible that the port is opened, but the RabbitMQ service 
has not started yet.

To resolve it I would recommend to just remove this line.. If removed, it will 
use the default RabbitMQContainer wait strategy, which is to wait for a 
specific log message ("Server startup complete" to be specific). 

What do you think? Can I prepare a PR for this?

> RMQSourceITCase.testStopWithSavepoint and testAckFailure failed on azure due 
> to connection error
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24806
>                 URL: https://issues.apache.org/jira/browse/FLINK-24806
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors/ RabbitMQ
>    Affects Versions: 1.15.0
>            Reporter: Yun Gao
>            Priority: Major
>
> {code:java}
> Nov 05 22:12:57 [INFO] Running 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
> Nov 05 22:13:18 [ERROR] Tests run: 3, Failures: 0, Errors: 2, Skipped: 0, 
> Time elapsed: 20.921 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase
> Nov 05 22:13:18 [ERROR] testStopWithSavepoint  Time elapsed: 3.39 s  <<< 
> ERROR!
> Nov 05 22:13:18 java.io.IOException
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:396)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207)
> Nov 05 22:13:18       at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.getRMQConnection(RMQSourceITCase.java:203)
> Nov 05 22:13:18       at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.setUp(RMQSourceITCase.java:98)
> Nov 05 22:13:18       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 05 22:13:18       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 05 22:13:18       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 05 22:13:18       at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 05 22:13:18       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 05 22:13:18       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 05 22:13:18 
> Nov 05 22:13:18 [ERROR] testAckFailure  Time elapsed: 1.842 s  <<< ERROR!
> Nov 05 22:13:18 java.io.IOException
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:396)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045)
> Nov 05 22:13:18       at 
> com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207)
> Nov 05 22:13:18       at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.getRMQConnection(RMQSourceITCase.java:203)
> Nov 05 22:13:18       at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.setUp(RMQSourceITCase.java:98)
> Nov 05 22:13:18       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 05 22:13:18       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 05 22:13:18       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26065&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=13478]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to