This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d44bc0e973cca6d03ed222158b7b5ce70306ab0 Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Jan 25 16:16:04 2022 +0100 [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts --- .../cassandra/CassandraConnectorITCase.java | 41 ++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index cd85989..c1681d5 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -66,11 +66,16 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.CassandraContainer; +import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -81,6 +86,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.testcontainers.containers.output.Slf4jLogConsumer; + import scala.collection.JavaConverters; import scala.collection.Seq; @@ -97,15 +104,17 @@ public class CassandraConnectorITCase Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { - @ClassRule - public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer(); - private static final int MAX_CONNECTION_RETRY = 3; private static final long CONNECTION_RETRY_DELAY = 500L; private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); + private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); private static final String TABLE_POJO = "test"; private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace"; + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @ClassRule + public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer(); @Rule public final RetryRule retryRule = new RetryRule(); private static final int PORT = 9042; @@ -199,11 +208,13 @@ public class CassandraConnectorITCase public static CassandraContainer createCassandraContainer() { CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_3); cassandra.withJmxReporting(false); + cassandra.withLogConsumer(LOG_CONSUMER); return cassandra; } @BeforeClass public static void startAndInitializeCassandra() { + raiseCassandraRequestsTimeouts(); // CASSANDRA_CONTAINER#start() already contains retrials CASSANDRA_CONTAINER.start(); cluster = CASSANDRA_CONTAINER.getCluster(); @@ -237,6 +248,30 @@ public class CassandraConnectorITCase CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial")); } + private static void raiseCassandraRequestsTimeouts() { + try { + final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath(); + CASSANDRA_CONTAINER.copyFileFromContainer( + "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString()); + String configuration = + new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8); + String patchedConfiguration = + configuration + .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000") + .replaceAll( + "read_request_timeout_in_ms: [0-9]+", + "read_request_timeout_in_ms: 15000") + .replaceAll( + "write_request_timeout_in_ms: [0-9]+", + "write_request_timeout_in_ms: 6000"); + Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8)); + CASSANDRA_CONTAINER.withConfigurationOverride( + configurationPath.toAbsolutePath().toString()); + } catch (IOException e) { + throw new RuntimeException("Unable to open Cassandra configuration file ", e); + } + } + @Before public void createTable() { tableID = random.nextInt(Integer.MAX_VALUE);
