This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1562a21  [FLINK-23322][connectors/rabbitmq] Increase RMQSource 
handshake timeout tolerating network congestions
1562a21 is described below

commit 1562a211697aa200bbaf8f61511d44d6e4ee3c24
Author: Fabian Paul <[email protected]>
AuthorDate: Fri Jul 9 13:29:30 2021 +0200

    [FLINK-23322][connectors/rabbitmq] Increase RMQSource handshake timeout 
tolerating network congestions
---
 .../connectors/rabbitmq/RMQSourceITCase.java       |  9 +++++++
 .../src/test/resources/log4j2-test.properties      | 28 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
index 488b0e4..0e687f9 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
@@ -43,7 +43,10 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerImageName;
 
@@ -54,6 +57,10 @@ import java.util.concurrent.TimeoutException;
 /** A class containing RabbitMQ source tests against a real RabbiMQ cluster. */
 public class RMQSourceITCase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(RMQSourceITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new 
Slf4jLogConsumer(LOG);
+
+    private static final int HANDSHAKE_TIMEOUT = 30000;
     private static final int RABBITMQ_PORT = 5672;
     private static final String QUEUE_NAME = "test-queue";
     private static final JobID JOB_ID = new JobID();
@@ -75,6 +82,7 @@ public class RMQSourceITCase {
     public static final RabbitMQContainer RMQ_CONTAINER =
             new 
RabbitMQContainer(DockerImageName.parse(DockerImageVersions.RABBITMQ))
                     .withExposedPorts(RABBITMQ_PORT)
+                    .withLogConsumer(LOG_CONSUMER)
                     .waitingFor(Wait.forListeningPort());
 
     @Before
@@ -124,6 +132,7 @@ public class RMQSourceITCase {
         ConnectionFactory factory = new ConnectionFactory();
         factory.setUsername(RMQ_CONTAINER.getAdminUsername());
         factory.setPassword(RMQ_CONTAINER.getAdminPassword());
+        factory.setHandshakeTimeout(HANDSHAKE_TIMEOUT);
         factory.setVirtualHost("/");
         factory.setHost(RMQ_CONTAINER.getHost());
         factory.setPort(RMQ_CONTAINER.getAmqpPort());
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

Reply via email to