This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1562a21 [FLINK-23322][connectors/rabbitmq] Increase RMQSource
handshake timeout tolerating network congestions
1562a21 is described below
commit 1562a211697aa200bbaf8f61511d44d6e4ee3c24
Author: Fabian Paul <[email protected]>
AuthorDate: Fri Jul 9 13:29:30 2021 +0200
[FLINK-23322][connectors/rabbitmq] Increase RMQSource handshake timeout
tolerating network congestions
---
.../connectors/rabbitmq/RMQSourceITCase.java | 9 +++++++
.../src/test/resources/log4j2-test.properties | 28 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
index 488b0e4..0e687f9 100644
---
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
+++
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java
@@ -43,7 +43,10 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
@@ -54,6 +57,10 @@ import java.util.concurrent.TimeoutException;
/** A class containing RabbitMQ source tests against a real RabbiMQ cluster. */
public class RMQSourceITCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(RMQSourceITCase.class);
+ private static final Slf4jLogConsumer LOG_CONSUMER = new
Slf4jLogConsumer(LOG);
+
+ private static final int HANDSHAKE_TIMEOUT = 30000;
private static final int RABBITMQ_PORT = 5672;
private static final String QUEUE_NAME = "test-queue";
private static final JobID JOB_ID = new JobID();
@@ -75,6 +82,7 @@ public class RMQSourceITCase {
public static final RabbitMQContainer RMQ_CONTAINER =
new
RabbitMQContainer(DockerImageName.parse(DockerImageVersions.RABBITMQ))
.withExposedPorts(RABBITMQ_PORT)
+ .withLogConsumer(LOG_CONSUMER)
.waitingFor(Wait.forListeningPort());
@Before
@@ -124,6 +132,7 @@ public class RMQSourceITCase {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(RMQ_CONTAINER.getAdminUsername());
factory.setPassword(RMQ_CONTAINER.getAdminPassword());
+ factory.setHandshakeTimeout(HANDSHAKE_TIMEOUT);
factory.setVirtualHost("/");
factory.setHost(RMQ_CONTAINER.getHost());
factory.setPort(RMQ_CONTAINER.getAmqpPort());
diff --git
a/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
b/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++
b/flink-connectors/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n