This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new 96482aa  [FLINK-28198] Integrate tests with CassandraTestEnvironment 
which manages the cassandra cluster container, session, retrials and timeouts. 
Cleaning
96482aa is described below

commit 96482aa6f9e673f11aa6d798f6f362716d41f983
Author: Etienne Chauchot <[email protected]>
AuthorDate: Tue Apr 11 16:33:47 2023 +0200

    [FLINK-28198] Integrate tests with CassandraTestEnvironment which manages 
the cassandra cluster container, session, retrials and timeouts. Cleaning
---
 .../{source => }/CassandraTestEnvironment.java     |  39 ++-
 .../cassandra/source/CassandraSourceITCase.java    |   1 +
 .../cassandra/source/CassandraTestContext.java     |  48 ++-
 .../cassandra/CassandraConnectorITCase.java        | 332 +++++++--------------
 4 files changed, 156 insertions(+), 264 deletions(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
similarity index 85%
rename from 
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
rename to 
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index 24b9e60..15b98d2 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.cassandra.source;
+package org.apache.flink.connector.cassandra;
 
 import org.apache.flink.connector.testframe.TestResource;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
@@ -24,10 +24,10 @@ import 
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,14 +56,14 @@ public class CassandraTestEnvironment implements 
TestResource {
     // flushing mem table to SS tables is an asynchronous operation that may 
take a while
     private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
 
-    static final String KEYSPACE = "flink";
+    public static final String KEYSPACE = "flink";
 
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE "
                     + KEYSPACE
                     + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
 
-    static final String SPLITS_TABLE = "flinksplits";
+    public static final String SPLITS_TABLE = "flinksplits";
     private static final String CREATE_SPLITS_TABLE_QUERY =
             "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int 
PRIMARY KEY, counter int);";
     private static final String INSERT_INTO_FLINK_SPLITS =
@@ -75,7 +75,8 @@ public class CassandraTestEnvironment implements TestResource 
{
     boolean insertTestDataForSplitSizeTests;
     private Cluster cluster;
     private Session session;
-    private ClusterBuilder clusterBuilder;
+    private ClusterBuilder builderForReading;
+    private ClusterBuilder builderForWriting;
 
     public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
         this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
@@ -115,14 +116,21 @@ public class CassandraTestEnvironment implements 
TestResource {
                 OutputFrame.OutputType.STDOUT);
 
         cluster = cassandraContainer.getCluster();
-        clusterBuilder =
+        // ConsistencyLevel.ONE is the minimum level for reading
+        builderForReading =
                 createBuilderWithConsistencyLevel(
                         ConsistencyLevel.ONE,
                         cassandraContainer.getHost(),
                         cassandraContainer.getMappedPort(CQL_PORT));
 
+        // Lower consistency level ANY is only available for writing.
+        builderForWriting =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ANY,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
         session = cluster.connect();
-        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
         // create a dedicated table for split size tests (to avoid having to 
flush with each test)
         if (insertTestDataForSplitSizeTests) {
             insertTestDataForSplitSizeTests();
@@ -130,9 +138,9 @@ public class CassandraTestEnvironment implements 
TestResource {
     }
 
     private void insertTestDataForSplitSizeTests() throws Exception {
-        session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+        executeRequestWithTimeout(CREATE_SPLITS_TABLE_QUERY);
         for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
-            
session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, 
i)));
+            executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, 
i, i));
         }
         flushMemTables(SPLITS_TABLE);
     }
@@ -182,12 +190,17 @@ public class CassandraTestEnvironment implements 
TestResource {
         Thread.sleep(FLUSH_MEMTABLES_DELAY);
     }
 
-    static Statement requestWithTimeout(String query) {
-        return new 
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
+    public ResultSet executeRequestWithTimeout(String query) {
+        return session.execute(
+                new 
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS));
+    }
+
+    public ClusterBuilder getBuilderForReading() {
+        return builderForReading;
     }
 
-    public ClusterBuilder getClusterBuilder() {
-        return clusterBuilder;
+    public ClusterBuilder getBuilderForWriting() {
+        return builderForWriting;
     }
 
     public Session getSession() {
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
index 83ecaae..d6eecb1 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.cassandra.source;
 
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
 import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
 import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
 import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
index fb69f2b..8aa338a 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
@@ -20,15 +20,14 @@ package org.apache.flink.connector.cassandra.source;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
 import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 import org.apache.flink.connectors.cassandra.utils.Pojo;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
-import com.datastax.driver.core.Session;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 
@@ -60,13 +59,10 @@ public class CassandraTestContext implements 
DataStreamSourceExternalContext<Poj
 
     private final Mapper<Pojo> mapper;
     private final MapperOptions mapperOptions;
-    private final ClusterBuilder clusterBuilder;
-    private final Session session;
-    private ExternalSystemSplitDataWriter<Pojo> splitDataWriter;
+    private final CassandraTestEnvironment cassandraTestEnvironment;
 
     public CassandraTestContext(CassandraTestEnvironment 
cassandraTestEnvironment) {
-        clusterBuilder = cassandraTestEnvironment.getClusterBuilder();
-        session = cassandraTestEnvironment.getSession();
+        this.cassandraTestEnvironment = cassandraTestEnvironment;
         createTable();
         mapper = new 
MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class);
         mapperOptions = () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)};
@@ -87,7 +83,7 @@ public class CassandraTestContext implements 
DataStreamSourceExternalContext<Poj
             throws UnsupportedOperationException {
 
         return new CassandraSource<>(
-                clusterBuilder,
+                cassandraTestEnvironment.getBuilderForReading(),
                 Pojo.class,
                 String.format(
                         "SELECT * FROM %s.%s;", 
CassandraTestEnvironment.KEYSPACE, TABLE_NAME),
@@ -97,23 +93,21 @@ public class CassandraTestContext implements 
DataStreamSourceExternalContext<Poj
     @Override
     public ExternalSystemSplitDataWriter<Pojo> createSourceSplitDataWriter(
             TestingSourceSettings sourceSettings) {
-        splitDataWriter =
-                new ExternalSystemSplitDataWriter<Pojo>() {
-
-                    @Override
-                    public void writeRecords(List<Pojo> records) {
-                        for (Pojo pojo : records) {
-                            mapper.save(pojo, 
mapperOptions.getMapperOptions());
-                        }
-                    }
-
-                    @Override
-                    public void close() {
-                        // nothing to do, cluster/session is shared at the 
CassandraTestEnvironment
-                        // level
-                    }
-                };
-        return splitDataWriter;
+        return new ExternalSystemSplitDataWriter<Pojo>() {
+
+            @Override
+            public void writeRecords(List<Pojo> records) {
+                for (Pojo pojo : records) {
+                    mapper.save(pojo, mapperOptions.getMapperOptions());
+                }
+            }
+
+            @Override
+            public void close() {
+                // nothing to do, cluster/session is shared at the 
CassandraTestEnvironment
+                // level
+            }
+        };
     }
 
     @Override
@@ -137,11 +131,11 @@ public class CassandraTestContext implements 
DataStreamSourceExternalContext<Poj
     }
 
     private void createTable() {
-        
session.execute(CassandraTestEnvironment.requestWithTimeout(CREATE_TABLE_QUERY));
+        cassandraTestEnvironment.executeRequestWithTimeout(CREATE_TABLE_QUERY);
     }
 
     private void dropTable() {
-        
session.execute(CassandraTestEnvironment.requestWithTimeout(DROP_TABLE_QUERY));
+        cassandraTestEnvironment.executeRequestWithTimeout(DROP_TABLE_QUERY);
     }
 
     static class CassandraTestContextFactory
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a560b95..f12e595 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,19 +45,10 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
 import org.apache.flink.types.Row;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.annotations.Table;
 import net.bytebuddy.ByteBuddy;
@@ -66,27 +58,14 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.CassandraContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
-import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -94,12 +73,11 @@ import java.util.UUID;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
+import static 
org.apache.flink.connector.cassandra.CassandraTestEnvironment.KEYSPACE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for all cassandra sinks. */
 @SuppressWarnings("serial")
-// NoHostAvailableException is raised by Cassandra client under load while 
connecting to the cluster
-@RetryOnException(times = 3, exception = NoHostAvailableException.class)
 @Testcontainers
 @ExtendWith(RetryExtension.class)
 class CassandraConnectorITCase
@@ -107,66 +85,14 @@ class CassandraConnectorITCase
                 Tuple3<String, Integer, Integer>,
                 CassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>>> {
 
-    private static final String CASSANDRA_4_0 = "cassandra:4.0.3";
-    private static final int MAX_CONNECTION_RETRY = 3;
-    private static final long CONNECTION_RETRY_DELAY = 500L;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
-    private static final Slf4jLogConsumer LOG_CONSUMER = new 
Slf4jLogConsumer(LOG);
-
-    @TempDir static Path tmpDir;
-
-    private static final int READ_TIMEOUT_MILLIS = 36000;
-
-    @Container static final CassandraContainer CASSANDRA_CONTAINER = 
createCassandraContainer();
-
-    private static final int PORT = 9042;
-
-    private static Cluster cluster;
-    private static Session session;
-
-    private final ClusterBuilder builderForReading =
-            createBuilderWithConsistencyLevel(ConsistencyLevel.ONE);
-    // Lower consistency level ANY is only available for writing.
-    private final ClusterBuilder builderForWriting =
-            createBuilderWithConsistencyLevel(ConsistencyLevel.ANY);
-
-    private ClusterBuilder createBuilderWithConsistencyLevel(ConsistencyLevel 
consistencyLevel) {
-        return new ClusterBuilder() {
-            @Override
-            protected Cluster buildCluster(Cluster.Builder builder) {
-                return builder.addContactPointsWithPorts(
-                                new InetSocketAddress(
-                                        CASSANDRA_CONTAINER.getHost(),
-                                        
CASSANDRA_CONTAINER.getMappedPort(PORT)))
-                        .withQueryOptions(
-                                new QueryOptions()
-                                        .setConsistencyLevel(consistencyLevel)
-                                        
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
-                        .withSocketOptions(
-                                new SocketOptions()
-                                        // default timeout x 3
-                                        .setConnectTimeoutMillis(15000)
-                                        // default timeout x3 and higher than
-                                        // request_timeout_in_ms at the 
cluster level
-                                        
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
-                        .withoutJMXReporting()
-                        .withoutMetrics()
-                        .build();
-            }
-        };
-    }
+    private static final CassandraTestEnvironment cassandraTestEnvironment =
+            new CassandraTestEnvironment(false);
 
     private static final String TABLE_NAME_PREFIX = "flink_";
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
-    private static final String KEYSPACE = "flink";
     private static final String TUPLE_ID_FIELD = "id";
     private static final String TUPLE_COUNTER_FIELD = "counter";
     private static final String TUPLE_BATCHID_FIELD = "batch_id";
-    private static final String CREATE_KEYSPACE_QUERY =
-            "CREATE KEYSPACE "
-                    + KEYSPACE
-                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE "
                     + KEYSPACE
@@ -267,43 +193,12 @@ class CassandraConnectorITCase
     //  Utility methods
     // ------------------------------------------------------------------------
 
-    public static CassandraContainer createCassandraContainer() {
-        CassandraContainer cassandra = new CassandraContainer(CASSANDRA_4_0);
-        cassandra.withJmxReporting(false);
-        cassandra.withLogConsumer(LOG_CONSUMER);
-        return cassandra;
-    }
-
-    private static void raiseCassandraRequestsTimeouts() {
-        try {
-            final Path configurationPath = 
Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
-            CASSANDRA_CONTAINER.copyFileFromContainer(
-                    "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
-            String configuration =
-                    new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
-            String patchedConfiguration =
-                    configuration
-                            .replaceAll(
-                                    "request_timeout_in_ms: [0-9]+",
-                                    "request_timeout_in_ms: 30000") // x3 
default timeout
-                            .replaceAll(
-                                    "read_request_timeout_in_ms: [0-9]+",
-                                    "read_request_timeout_in_ms: 15000") // x3 
default timeout
-                            .replaceAll(
-                                    "write_request_timeout_in_ms: [0-9]+",
-                                    "write_request_timeout_in_ms: 6000"); // 
x3 default timeout
-            CASSANDRA_CONTAINER.copyFileToContainer(
-                    
Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
-                    "/etc/cassandra/cassandra.yaml");
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to open Cassandra configuration 
file ", e);
-        }
-    }
-
     private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) {
         final CassandraPojoInputFormat<T> source =
                 new CassandraPojoInputFormat<>(
-                        injectTableName(SELECT_DATA_QUERY), builderForReading, 
annotatedPojoClass);
+                        injectTableName(SELECT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForReading(),
+                        annotatedPojoClass);
         List<T> result = new ArrayList<>();
 
         try {
@@ -322,7 +217,7 @@ class CassandraConnectorITCase
     private <T> List<T> writePojosWithOutputFormat(Class<T> 
annotatedPojoClass) throws Exception {
         final CassandraPojoOutputFormat<T> sink =
                 new CassandraPojoOutputFormat<>(
-                        builderForWriting,
+                        cassandraTestEnvironment.getBuilderForWriting(),
                         annotatedPojoClass,
                         () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
 
@@ -357,60 +252,26 @@ class CassandraConnectorITCase
     // ------------------------------------------------------------------------
 
     @BeforeAll
-    static void startAndInitializeCassandra() {
-        raiseCassandraRequestsTimeouts();
-        // CASSANDRA_CONTAINER#start() already contains retrials
-        CASSANDRA_CONTAINER.start();
-        cluster = CASSANDRA_CONTAINER.getCluster();
-        int retried = 0;
-        while (retried < MAX_CONNECTION_RETRY) {
-            try {
-                session = cluster.connect();
-                break;
-            } catch (NoHostAvailableException e) {
-                retried++;
-                LOG.debug(
-                        "Connection failed with NoHostAvailableException : 
retry number {}, will retry to connect within {} ms",
-                        retried,
-                        CONNECTION_RETRY_DELAY);
-                if (retried == MAX_CONNECTION_RETRY) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Failed to connect to Cassandra cluster 
after %d retries every %d ms",
-                                    retried, CONNECTION_RETRY_DELAY),
-                            e);
-                }
-                try {
-                    Thread.sleep(CONNECTION_RETRY_DELAY);
-                } catch (InterruptedException ignored) {
-                }
-            }
-        }
-        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+    static void startUp() throws Exception {
+        cassandraTestEnvironment.startUp();
     }
 
     @BeforeEach
     void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);
-        
session.execute(requestWithTimeout(injectTableName(CREATE_TABLE_QUERY)));
+        
cassandraTestEnvironment.executeRequestWithTimeout(injectTableName(CREATE_TABLE_QUERY));
     }
 
     @AfterAll
-    static void closeCassandra() {
-        if (session != null) {
-            session.close();
-        }
-        if (cluster != null) {
-            cluster.close();
-        }
-        CASSANDRA_CONTAINER.stop();
+    static void tearDown() throws Exception {
+        cassandraTestEnvironment.tearDown();
     }
 
     // ------------------------------------------------------------------------
     //  Technical Tests
     // ------------------------------------------------------------------------
 
-    @TestTemplate
+    @Test
     void testAnnotatePojoWithTable() {
         final String tableName = TABLE_NAME_PREFIX + tableID;
 
@@ -419,22 +280,6 @@ class CassandraConnectorITCase
         assertThat(pojoTableAnnotation.name()).contains(tableName);
     }
 
-    @TestTemplate
-    void testRaiseCassandraRequestsTimeouts() throws IOException {
-        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
-        // do not change the container conf twice, just assert that it was 
indeed changed in the
-        // container
-        final Path configurationPath = 
Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
-        CASSANDRA_CONTAINER.copyFileFromContainer(
-                "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
-        final String configuration =
-                new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
-        assertThat(configuration)
-                .contains("request_timeout_in_ms: 30000")
-                .contains("read_request_timeout_in_ms: 15000")
-                .contains("write_request_timeout_in_ms: 6000");
-    }
-
     // ------------------------------------------------------------------------
     //  Exactly-once Tests
     // ------------------------------------------------------------------------
@@ -446,8 +291,8 @@ class CassandraConnectorITCase
                 injectTableName(INSERT_DATA_QUERY),
                 TypeExtractor.getForObject(new Tuple3<>("", 0, 0))
                         .createSerializer(new ExecutionConfig()),
-                builderForReading,
-                new CassandraCommitter(builderForReading));
+                cassandraTestEnvironment.getBuilderForReading(),
+                new 
CassandraCommitter(cassandraTestEnvironment.getBuilderForReading()));
     }
 
     @Override
@@ -464,7 +309,9 @@ class CassandraConnectorITCase
     protected void verifyResultsIdealCircumstances(
             CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
-        ResultSet result = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet result =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         ArrayList<Integer> list = new ArrayList<>();
         for (int x = 1; x <= 60; x++) {
             list.add(x);
@@ -482,7 +329,9 @@ class CassandraConnectorITCase
     protected void verifyResultsDataPersistenceUponMissedNotify(
             CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
-        ResultSet result = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet result =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         ArrayList<Integer> list = new ArrayList<>();
         for (int x = 1; x <= 60; x++) {
             list.add(x);
@@ -500,7 +349,9 @@ class CassandraConnectorITCase
     protected void verifyResultsDataDiscardingUponRestore(
             CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
-        ResultSet result = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet result =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         ArrayList<Integer> list = new ArrayList<>();
         for (int x = 1; x <= 20; x++) {
             list.add(x);
@@ -534,7 +385,9 @@ class CassandraConnectorITCase
         }
 
         ArrayList<Integer> actual = new ArrayList<>();
-        ResultSet result = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet result =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
 
         for (com.datastax.driver.core.Row s : result) {
             actual.add(s.getInt(TUPLE_COUNTER_FIELD));
@@ -544,18 +397,24 @@ class CassandraConnectorITCase
         assertThat(actual.toArray()).isEqualTo(expected.toArray());
     }
 
-    @TestTemplate
+    @Test
     void testCassandraCommitter() throws Exception {
         String jobID = new JobID().toString();
-        CassandraCommitter cc1 = new CassandraCommitter(builderForReading, 
"flink_auxiliary_cc");
+        CassandraCommitter cc1 =
+                new CassandraCommitter(
+                        cassandraTestEnvironment.getBuilderForReading(), 
"flink_auxiliary_cc");
         cc1.setJobId(jobID);
         cc1.setOperatorId("operator");
 
-        CassandraCommitter cc2 = new CassandraCommitter(builderForReading, 
"flink_auxiliary_cc");
+        CassandraCommitter cc2 =
+                new CassandraCommitter(
+                        cassandraTestEnvironment.getBuilderForReading(), 
"flink_auxiliary_cc");
         cc2.setJobId(jobID);
         cc2.setOperatorId("operator");
 
-        CassandraCommitter cc3 = new CassandraCommitter(builderForReading, 
"flink_auxiliary_cc");
+        CassandraCommitter cc3 =
+                new CassandraCommitter(
+                        cassandraTestEnvironment.getBuilderForReading(), 
"flink_auxiliary_cc");
         cc3.setJobId(jobID);
         cc3.setOperatorId("operator1");
 
@@ -582,7 +441,9 @@ class CassandraConnectorITCase
         cc2.close();
         cc3.close();
 
-        cc1 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
+        cc1 =
+                new CassandraCommitter(
+                        cassandraTestEnvironment.getBuilderForReading(), 
"flink_auxiliary_cc");
         cc1.setJobId(jobID);
         cc1.setOperatorId("operator");
 
@@ -600,10 +461,12 @@ class CassandraConnectorITCase
     //  At-least-once Tests
     // ------------------------------------------------------------------------
 
-    @TestTemplate
+    @Test
     void testCassandraTupleAtLeastOnceSink() throws Exception {
         CassandraTupleSink<Tuple3<String, Integer, Integer>> sink =
-                new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), 
builderForWriting);
+                new CassandraTupleSink<>(
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
         try {
             sink.open(new Configuration());
             for (Tuple3<String, Integer, Integer> value : collection) {
@@ -613,15 +476,19 @@ class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         assertThat(rs.all()).hasSize(20);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraRowAtLeastOnceSink() throws Exception {
         CassandraRowSink sink =
                 new CassandraRowSink(
-                        FIELD_TYPES.length, 
injectTableName(INSERT_DATA_QUERY), builderForWriting);
+                        FIELD_TYPES.length,
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
         try {
             sink.open(new Configuration());
             for (Row value : rowCollection) {
@@ -631,26 +498,32 @@ class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         assertThat(rs.all()).hasSize(20);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraPojoAtLeastOnceSink() throws Exception {
         final Class<? extends Pojo> annotatedPojoClass =
                 annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
         writePojos(annotatedPojoClass, null);
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         assertThat(rs.all()).hasSize(20);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws 
Exception {
         final Class<? extends Pojo> annotatedPojoClass =
                 annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
         writePojos(annotatedPojoClass, KEYSPACE);
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         assertThat(rs.all()).hasSize(20);
     }
 
@@ -658,7 +531,11 @@ class CassandraConnectorITCase
             throws Exception {
         final Constructor<T> pojoConstructor = 
getPojoConstructor(annotatedPojoClass);
         CassandraPojoSink<T> sink =
-                new CassandraPojoSink<>(annotatedPojoClass, builderForWriting, 
null, keyspace);
+                new CassandraPojoSink<>(
+                        annotatedPojoClass,
+                        cassandraTestEnvironment.getBuilderForWriting(),
+                        null,
+                        keyspace);
         try {
             sink.open(new Configuration());
             for (int x = 0; x < 20; x++) {
@@ -669,7 +546,7 @@ class CassandraConnectorITCase
         }
     }
 
-    @TestTemplate
+    @Test
     void testCassandraTableSink() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(4);
@@ -682,7 +559,8 @@ class CassandraConnectorITCase
                 .registerTableSinkInternal(
                         "cassandraTable",
                         new CassandraAppendTableSink(
-                                        builderForWriting, 
injectTableName(INSERT_DATA_QUERY))
+                                        
cassandraTestEnvironment.getBuilderForWriting(),
+                                        injectTableName(INSERT_DATA_QUERY))
                                 .configure(
                                         new String[] {"f0", "f1", "f2"},
                                         new TypeInformation[] {
@@ -691,7 +569,9 @@ class CassandraConnectorITCase
 
         tEnv.sqlQuery("select * from 
testFlinkTable").executeInsert("cassandraTable").await();
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
 
         // validate that all input was correctly written to Cassandra
         List<Row> input = new ArrayList<>(rowCollection);
@@ -710,23 +590,16 @@ class CassandraConnectorITCase
 
     private static int retrialsCount = 0;
 
-    @TestTemplate
-    void testRetrial() {
-        annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
-        if (retrialsCount < 2) {
-            retrialsCount++;
-            throw new NoHostAvailableException(new HashMap<>());
-        }
-    }
-
-    @TestTemplate
+    @Test
     void testCassandraBatchPojoFormat() throws Exception {
 
         final Class<? extends Pojo> annotatedPojoClass =
                 annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
 
         final List<? extends Pojo> pojos = 
writePojosWithOutputFormat(annotatedPojoClass);
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         assertThat(rs.all()).hasSize(20);
 
         final List<? extends Pojo> result = 
readPojosWithInputFormat(annotatedPojoClass);
@@ -739,11 +612,12 @@ class CassandraConnectorITCase
                 .isEqualTo(pojos);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraBatchTupleFormat() throws Exception {
         OutputFormat<Tuple3<String, Integer, Integer>> sink =
                 new CassandraTupleOutputFormat<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
         try {
             sink.configure(new Configuration());
             sink.open(0, 1);
@@ -756,7 +630,8 @@ class CassandraConnectorITCase
 
         sink =
                 new CassandraTupleOutputFormat<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
         try {
             sink.configure(new Configuration());
             sink.open(0, 1);
@@ -768,7 +643,9 @@ class CassandraConnectorITCase
         }
 
         InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source =
-                new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), 
builderForReading);
+                new CassandraInputFormat<>(
+                        injectTableName(SELECT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForReading());
         List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
         try {
             source.configure(new Configuration());
@@ -783,10 +660,12 @@ class CassandraConnectorITCase
         assertThat(result).hasSize(20);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraBatchRowFormat() throws Exception {
         OutputFormat<Row> sink =
-                new 
CassandraRowOutputFormat(injectTableName(INSERT_DATA_QUERY), builderForWriting);
+                new CassandraRowOutputFormat(
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
         try {
             sink.configure(new Configuration());
             sink.open(0, 1);
@@ -798,12 +677,14 @@ class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         List<com.datastax.driver.core.Row> rows = rs.all();
         assertThat(rows).hasSameSizeAs(rowCollection);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws 
Exception {
         Class<scala.Tuple1<String>> c =
                 (Class<scala.Tuple1<String>>) new 
scala.Tuple1<>("hello").getClass();
@@ -833,11 +714,12 @@ class CassandraConnectorITCase
         
assertThat(sinkBuilder).isInstanceOf(CassandraSink.CassandraScalaProductSinkBuilder.class);
     }
 
-    @TestTemplate
+    @Test
     void testCassandraScalaTupleAtLeastSink() throws Exception {
         CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink 
=
                 new CassandraScalaProductSink<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting());
 
         List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = 
new ArrayList<>(20);
         for (int i = 0; i < 20; i++) {
@@ -852,7 +734,9 @@ class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         List<com.datastax.driver.core.Row> rows = rs.all();
         assertThat(rows).hasSameSizeAs(scalaTupleCollection);
 
@@ -866,13 +750,15 @@ class CassandraConnectorITCase
         assertThat(scalaTupleCollection).isEmpty();
     }
 
-    @TestTemplate
+    @Test
     void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
         CassandraSinkBaseConfig config =
                 
CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
         CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink 
=
                 new CassandraScalaProductSink<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting, 
config);
+                        injectTableName(INSERT_DATA_QUERY),
+                        cassandraTestEnvironment.getBuilderForWriting(),
+                        config);
 
         String id = UUID.randomUUID().toString();
         Integer counter = 1;
@@ -892,7 +778,9 @@ class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+        ResultSet rs =
+                cassandraTestEnvironment.executeRequestWithTimeout(
+                        injectTableName(SELECT_DATA_QUERY));
         List<com.datastax.driver.core.Row> rows = rs.all();
         assertThat(rows).hasSize(1);
         // Since nulls are ignored, we should be reading one complete record
@@ -905,8 +793,4 @@ class CassandraConnectorITCase
                     .isEqualTo(new scala.Tuple3<>(id, counter, batchId));
         }
     }
-
-    private static Statement requestWithTimeout(String query) {
-        return new 
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
-    }
 }


Reply via email to