This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 96482aa [FLINK-28198] Integrate tests with CassandraTestEnvironment
which manages the cassandra cluster container, session, retrials and timeouts.
Cleaning
96482aa is described below
commit 96482aa6f9e673f11aa6d798f6f362716d41f983
Author: Etienne Chauchot <[email protected]>
AuthorDate: Tue Apr 11 16:33:47 2023 +0200
[FLINK-28198] Integrate tests with CassandraTestEnvironment which manages
the cassandra cluster container, session, retrials and timeouts. Cleaning
---
.../{source => }/CassandraTestEnvironment.java | 39 ++-
.../cassandra/source/CassandraSourceITCase.java | 1 +
.../cassandra/source/CassandraTestContext.java | 48 ++-
.../cassandra/CassandraConnectorITCase.java | 332 +++++++--------------
4 files changed, 156 insertions(+), 264 deletions(-)
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
similarity index 85%
rename from
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
rename to
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index 24b9e60..15b98d2 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.connector.cassandra.source;
+package org.apache.flink.connector.cassandra;
import org.apache.flink.connector.testframe.TestResource;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
@@ -24,10 +24,10 @@ import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +56,14 @@ public class CassandraTestEnvironment implements
TestResource {
// flushing mem table to SS tables is an asynchronous operation that may
take a while
private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
- static final String KEYSPACE = "flink";
+ public static final String KEYSPACE = "flink";
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE "
+ KEYSPACE
+ " WITH replication= {'class':'SimpleStrategy',
'replication_factor':1};";
- static final String SPLITS_TABLE = "flinksplits";
+ public static final String SPLITS_TABLE = "flinksplits";
private static final String CREATE_SPLITS_TABLE_QUERY =
"CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int
PRIMARY KEY, counter int);";
private static final String INSERT_INTO_FLINK_SPLITS =
@@ -75,7 +75,8 @@ public class CassandraTestEnvironment implements TestResource
{
boolean insertTestDataForSplitSizeTests;
private Cluster cluster;
private Session session;
- private ClusterBuilder clusterBuilder;
+ private ClusterBuilder builderForReading;
+ private ClusterBuilder builderForWriting;
public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
@@ -115,14 +116,21 @@ public class CassandraTestEnvironment implements
TestResource {
OutputFrame.OutputType.STDOUT);
cluster = cassandraContainer.getCluster();
- clusterBuilder =
+ // ConsistencyLevel.ONE is the minimum level for reading
+ builderForReading =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ONE,
cassandraContainer.getHost(),
cassandraContainer.getMappedPort(CQL_PORT));
+ // Lower consistency level ANY is only available for writing.
+ builderForWriting =
+ createBuilderWithConsistencyLevel(
+ ConsistencyLevel.ANY,
+ cassandraContainer.getHost(),
+ cassandraContainer.getMappedPort(CQL_PORT));
session = cluster.connect();
- session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+ executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
// create a dedicated table for split size tests (to avoid having to
flush with each test)
if (insertTestDataForSplitSizeTests) {
insertTestDataForSplitSizeTests();
@@ -130,9 +138,9 @@ public class CassandraTestEnvironment implements
TestResource {
}
private void insertTestDataForSplitSizeTests() throws Exception {
- session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+ executeRequestWithTimeout(CREATE_SPLITS_TABLE_QUERY);
for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
-
session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i,
i)));
+ executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS,
i, i));
}
flushMemTables(SPLITS_TABLE);
}
@@ -182,12 +190,17 @@ public class CassandraTestEnvironment implements
TestResource {
Thread.sleep(FLUSH_MEMTABLES_DELAY);
}
- static Statement requestWithTimeout(String query) {
- return new
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
+ public ResultSet executeRequestWithTimeout(String query) {
+ return session.execute(
+ new
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS));
+ }
+
+ public ClusterBuilder getBuilderForReading() {
+ return builderForReading;
}
- public ClusterBuilder getClusterBuilder() {
- return clusterBuilder;
+ public ClusterBuilder getBuilderForWriting() {
+ return builderForWriting;
}
public Session getSession() {
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
index 83ecaae..d6eecb1 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.cassandra.source;
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
import
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
index fb69f2b..8aa338a 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
@@ -20,15 +20,14 @@ package org.apache.flink.connector.cassandra.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
import
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.connectors.cassandra.utils.Pojo;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
-import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
@@ -60,13 +59,10 @@ public class CassandraTestContext implements
DataStreamSourceExternalContext<Poj
private final Mapper<Pojo> mapper;
private final MapperOptions mapperOptions;
- private final ClusterBuilder clusterBuilder;
- private final Session session;
- private ExternalSystemSplitDataWriter<Pojo> splitDataWriter;
+ private final CassandraTestEnvironment cassandraTestEnvironment;
public CassandraTestContext(CassandraTestEnvironment
cassandraTestEnvironment) {
- clusterBuilder = cassandraTestEnvironment.getClusterBuilder();
- session = cassandraTestEnvironment.getSession();
+ this.cassandraTestEnvironment = cassandraTestEnvironment;
createTable();
mapper = new
MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class);
mapperOptions = () -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)};
@@ -87,7 +83,7 @@ public class CassandraTestContext implements
DataStreamSourceExternalContext<Poj
throws UnsupportedOperationException {
return new CassandraSource<>(
- clusterBuilder,
+ cassandraTestEnvironment.getBuilderForReading(),
Pojo.class,
String.format(
"SELECT * FROM %s.%s;",
CassandraTestEnvironment.KEYSPACE, TABLE_NAME),
@@ -97,23 +93,21 @@ public class CassandraTestContext implements
DataStreamSourceExternalContext<Poj
@Override
public ExternalSystemSplitDataWriter<Pojo> createSourceSplitDataWriter(
TestingSourceSettings sourceSettings) {
- splitDataWriter =
- new ExternalSystemSplitDataWriter<Pojo>() {
-
- @Override
- public void writeRecords(List<Pojo> records) {
- for (Pojo pojo : records) {
- mapper.save(pojo,
mapperOptions.getMapperOptions());
- }
- }
-
- @Override
- public void close() {
- // nothing to do, cluster/session is shared at the
CassandraTestEnvironment
- // level
- }
- };
- return splitDataWriter;
+ return new ExternalSystemSplitDataWriter<Pojo>() {
+
+ @Override
+ public void writeRecords(List<Pojo> records) {
+ for (Pojo pojo : records) {
+ mapper.save(pojo, mapperOptions.getMapperOptions());
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do, cluster/session is shared at the
CassandraTestEnvironment
+ // level
+ }
+ };
}
@Override
@@ -137,11 +131,11 @@ public class CassandraTestContext implements
DataStreamSourceExternalContext<Poj
}
private void createTable() {
-
session.execute(CassandraTestEnvironment.requestWithTimeout(CREATE_TABLE_QUERY));
+ cassandraTestEnvironment.executeRequestWithTimeout(CREATE_TABLE_QUERY);
}
private void dropTable() {
-
session.execute(CassandraTestEnvironment.requestWithTimeout(DROP_TABLE_QUERY));
+ cassandraTestEnvironment.executeRequestWithTimeout(DROP_TABLE_QUERY);
}
static class CassandraTestContextFactory
diff --git
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a560b95..f12e595 100644
---
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,6 +36,7 @@ import
org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,19 +45,10 @@ import
org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
import org.apache.flink.types.Row;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.annotations.Table;
import net.bytebuddy.ByteBuddy;
@@ -66,27 +58,14 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.CassandraContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -94,12 +73,11 @@ import java.util.UUID;
import scala.collection.JavaConverters;
import scala.collection.Seq;
+import static
org.apache.flink.connector.cassandra.CassandraTestEnvironment.KEYSPACE;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for all cassandra sinks. */
@SuppressWarnings("serial")
-// NoHostAvailableException is raised by Cassandra client under load while
connecting to the cluster
-@RetryOnException(times = 3, exception = NoHostAvailableException.class)
@Testcontainers
@ExtendWith(RetryExtension.class)
class CassandraConnectorITCase
@@ -107,66 +85,14 @@ class CassandraConnectorITCase
Tuple3<String, Integer, Integer>,
CassandraTupleWriteAheadSink<Tuple3<String, Integer,
Integer>>> {
- private static final String CASSANDRA_4_0 = "cassandra:4.0.3";
- private static final int MAX_CONNECTION_RETRY = 3;
- private static final long CONNECTION_RETRY_DELAY = 500L;
-
- private static final Logger LOG =
LoggerFactory.getLogger(CassandraConnectorITCase.class);
- private static final Slf4jLogConsumer LOG_CONSUMER = new
Slf4jLogConsumer(LOG);
-
- @TempDir static Path tmpDir;
-
- private static final int READ_TIMEOUT_MILLIS = 36000;
-
- @Container static final CassandraContainer CASSANDRA_CONTAINER =
createCassandraContainer();
-
- private static final int PORT = 9042;
-
- private static Cluster cluster;
- private static Session session;
-
- private final ClusterBuilder builderForReading =
- createBuilderWithConsistencyLevel(ConsistencyLevel.ONE);
- // Lower consistency level ANY is only available for writing.
- private final ClusterBuilder builderForWriting =
- createBuilderWithConsistencyLevel(ConsistencyLevel.ANY);
-
- private ClusterBuilder createBuilderWithConsistencyLevel(ConsistencyLevel
consistencyLevel) {
- return new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPointsWithPorts(
- new InetSocketAddress(
- CASSANDRA_CONTAINER.getHost(),
-
CASSANDRA_CONTAINER.getMappedPort(PORT)))
- .withQueryOptions(
- new QueryOptions()
- .setConsistencyLevel(consistencyLevel)
-
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
- .withSocketOptions(
- new SocketOptions()
- // default timeout x 3
- .setConnectTimeoutMillis(15000)
- // default timeout x3 and higher than
- // request_timeout_in_ms at the
cluster level
-
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
- .withoutJMXReporting()
- .withoutMetrics()
- .build();
- }
- };
- }
+ private static final CassandraTestEnvironment cassandraTestEnvironment =
+ new CassandraTestEnvironment(false);
private static final String TABLE_NAME_PREFIX = "flink_";
private static final String TABLE_NAME_VARIABLE = "$TABLE";
- private static final String KEYSPACE = "flink";
private static final String TUPLE_ID_FIELD = "id";
private static final String TUPLE_COUNTER_FIELD = "counter";
private static final String TUPLE_BATCHID_FIELD = "batch_id";
- private static final String CREATE_KEYSPACE_QUERY =
- "CREATE KEYSPACE "
- + KEYSPACE
- + " WITH replication= {'class':'SimpleStrategy',
'replication_factor':1};";
private static final String CREATE_TABLE_QUERY =
"CREATE TABLE "
+ KEYSPACE
@@ -267,43 +193,12 @@ class CassandraConnectorITCase
// Utility methods
// ------------------------------------------------------------------------
- public static CassandraContainer createCassandraContainer() {
- CassandraContainer cassandra = new CassandraContainer(CASSANDRA_4_0);
- cassandra.withJmxReporting(false);
- cassandra.withLogConsumer(LOG_CONSUMER);
- return cassandra;
- }
-
- private static void raiseCassandraRequestsTimeouts() {
- try {
- final Path configurationPath =
Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
- CASSANDRA_CONTAINER.copyFileFromContainer(
- "/etc/cassandra/cassandra.yaml",
configurationPath.toAbsolutePath().toString());
- String configuration =
- new String(Files.readAllBytes(configurationPath),
StandardCharsets.UTF_8);
- String patchedConfiguration =
- configuration
- .replaceAll(
- "request_timeout_in_ms: [0-9]+",
- "request_timeout_in_ms: 30000") // x3
default timeout
- .replaceAll(
- "read_request_timeout_in_ms: [0-9]+",
- "read_request_timeout_in_ms: 15000") // x3
default timeout
- .replaceAll(
- "write_request_timeout_in_ms: [0-9]+",
- "write_request_timeout_in_ms: 6000"); //
x3 default timeout
- CASSANDRA_CONTAINER.copyFileToContainer(
-
Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
- "/etc/cassandra/cassandra.yaml");
- } catch (IOException e) {
- throw new RuntimeException("Unable to open Cassandra configuration
file ", e);
- }
- }
-
private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) {
final CassandraPojoInputFormat<T> source =
new CassandraPojoInputFormat<>(
- injectTableName(SELECT_DATA_QUERY), builderForReading,
annotatedPojoClass);
+ injectTableName(SELECT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForReading(),
+ annotatedPojoClass);
List<T> result = new ArrayList<>();
try {
@@ -322,7 +217,7 @@ class CassandraConnectorITCase
private <T> List<T> writePojosWithOutputFormat(Class<T>
annotatedPojoClass) throws Exception {
final CassandraPojoOutputFormat<T> sink =
new CassandraPojoOutputFormat<>(
- builderForWriting,
+ cassandraTestEnvironment.getBuilderForWriting(),
annotatedPojoClass,
() -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)});
@@ -357,60 +252,26 @@ class CassandraConnectorITCase
// ------------------------------------------------------------------------
@BeforeAll
- static void startAndInitializeCassandra() {
- raiseCassandraRequestsTimeouts();
- // CASSANDRA_CONTAINER#start() already contains retrials
- CASSANDRA_CONTAINER.start();
- cluster = CASSANDRA_CONTAINER.getCluster();
- int retried = 0;
- while (retried < MAX_CONNECTION_RETRY) {
- try {
- session = cluster.connect();
- break;
- } catch (NoHostAvailableException e) {
- retried++;
- LOG.debug(
- "Connection failed with NoHostAvailableException :
retry number {}, will retry to connect within {} ms",
- retried,
- CONNECTION_RETRY_DELAY);
- if (retried == MAX_CONNECTION_RETRY) {
- throw new RuntimeException(
- String.format(
- "Failed to connect to Cassandra cluster
after %d retries every %d ms",
- retried, CONNECTION_RETRY_DELAY),
- e);
- }
- try {
- Thread.sleep(CONNECTION_RETRY_DELAY);
- } catch (InterruptedException ignored) {
- }
- }
- }
- session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+ static void startUp() throws Exception {
+ cassandraTestEnvironment.startUp();
}
@BeforeEach
void createTable() {
tableID = random.nextInt(Integer.MAX_VALUE);
-
session.execute(requestWithTimeout(injectTableName(CREATE_TABLE_QUERY)));
+
cassandraTestEnvironment.executeRequestWithTimeout(injectTableName(CREATE_TABLE_QUERY));
}
@AfterAll
- static void closeCassandra() {
- if (session != null) {
- session.close();
- }
- if (cluster != null) {
- cluster.close();
- }
- CASSANDRA_CONTAINER.stop();
+ static void tearDown() throws Exception {
+ cassandraTestEnvironment.tearDown();
}
// ------------------------------------------------------------------------
// Technical Tests
// ------------------------------------------------------------------------
- @TestTemplate
+ @Test
void testAnnotatePojoWithTable() {
final String tableName = TABLE_NAME_PREFIX + tableID;
@@ -419,22 +280,6 @@ class CassandraConnectorITCase
assertThat(pojoTableAnnotation.name()).contains(tableName);
}
- @TestTemplate
- void testRaiseCassandraRequestsTimeouts() throws IOException {
- // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
- // do not change the container conf twice, just assert that it was
indeed changed in the
- // container
- final Path configurationPath =
Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
- CASSANDRA_CONTAINER.copyFileFromContainer(
- "/etc/cassandra/cassandra.yaml",
configurationPath.toAbsolutePath().toString());
- final String configuration =
- new String(Files.readAllBytes(configurationPath),
StandardCharsets.UTF_8);
- assertThat(configuration)
- .contains("request_timeout_in_ms: 30000")
- .contains("read_request_timeout_in_ms: 15000")
- .contains("write_request_timeout_in_ms: 6000");
- }
-
// ------------------------------------------------------------------------
// Exactly-once Tests
// ------------------------------------------------------------------------
@@ -446,8 +291,8 @@ class CassandraConnectorITCase
injectTableName(INSERT_DATA_QUERY),
TypeExtractor.getForObject(new Tuple3<>("", 0, 0))
.createSerializer(new ExecutionConfig()),
- builderForReading,
- new CassandraCommitter(builderForReading));
+ cassandraTestEnvironment.getBuilderForReading(),
+ new
CassandraCommitter(cassandraTestEnvironment.getBuilderForReading()));
}
@Override
@@ -464,7 +309,9 @@ class CassandraConnectorITCase
protected void verifyResultsIdealCircumstances(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>
sink) {
- ResultSet result =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet result =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
@@ -482,7 +329,9 @@ class CassandraConnectorITCase
protected void verifyResultsDataPersistenceUponMissedNotify(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>
sink) {
- ResultSet result =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet result =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
@@ -500,7 +349,9 @@ class CassandraConnectorITCase
protected void verifyResultsDataDiscardingUponRestore(
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>
sink) {
- ResultSet result =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet result =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 20; x++) {
list.add(x);
@@ -534,7 +385,9 @@ class CassandraConnectorITCase
}
ArrayList<Integer> actual = new ArrayList<>();
- ResultSet result =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet result =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
for (com.datastax.driver.core.Row s : result) {
actual.add(s.getInt(TUPLE_COUNTER_FIELD));
@@ -544,18 +397,24 @@ class CassandraConnectorITCase
assertThat(actual.toArray()).isEqualTo(expected.toArray());
}
- @TestTemplate
+ @Test
void testCassandraCommitter() throws Exception {
String jobID = new JobID().toString();
- CassandraCommitter cc1 = new CassandraCommitter(builderForReading,
"flink_auxiliary_cc");
+ CassandraCommitter cc1 =
+ new CassandraCommitter(
+ cassandraTestEnvironment.getBuilderForReading(),
"flink_auxiliary_cc");
cc1.setJobId(jobID);
cc1.setOperatorId("operator");
- CassandraCommitter cc2 = new CassandraCommitter(builderForReading,
"flink_auxiliary_cc");
+ CassandraCommitter cc2 =
+ new CassandraCommitter(
+ cassandraTestEnvironment.getBuilderForReading(),
"flink_auxiliary_cc");
cc2.setJobId(jobID);
cc2.setOperatorId("operator");
- CassandraCommitter cc3 = new CassandraCommitter(builderForReading,
"flink_auxiliary_cc");
+ CassandraCommitter cc3 =
+ new CassandraCommitter(
+ cassandraTestEnvironment.getBuilderForReading(),
"flink_auxiliary_cc");
cc3.setJobId(jobID);
cc3.setOperatorId("operator1");
@@ -582,7 +441,9 @@ class CassandraConnectorITCase
cc2.close();
cc3.close();
- cc1 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
+ cc1 =
+ new CassandraCommitter(
+ cassandraTestEnvironment.getBuilderForReading(),
"flink_auxiliary_cc");
cc1.setJobId(jobID);
cc1.setOperatorId("operator");
@@ -600,10 +461,12 @@ class CassandraConnectorITCase
// At-least-once Tests
// ------------------------------------------------------------------------
- @TestTemplate
+ @Test
void testCassandraTupleAtLeastOnceSink() throws Exception {
CassandraTupleSink<Tuple3<String, Integer, Integer>> sink =
- new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY),
builderForWriting);
+ new CassandraTupleSink<>(
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
try {
sink.open(new Configuration());
for (Tuple3<String, Integer, Integer> value : collection) {
@@ -613,15 +476,19 @@ class CassandraConnectorITCase
sink.close();
}
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
assertThat(rs.all()).hasSize(20);
}
- @TestTemplate
+ @Test
void testCassandraRowAtLeastOnceSink() throws Exception {
CassandraRowSink sink =
new CassandraRowSink(
- FIELD_TYPES.length,
injectTableName(INSERT_DATA_QUERY), builderForWriting);
+ FIELD_TYPES.length,
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
try {
sink.open(new Configuration());
for (Row value : rowCollection) {
@@ -631,26 +498,32 @@ class CassandraConnectorITCase
sink.close();
}
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
assertThat(rs.all()).hasSize(20);
}
- @TestTemplate
+ @Test
void testCassandraPojoAtLeastOnceSink() throws Exception {
final Class<? extends Pojo> annotatedPojoClass =
annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
writePojos(annotatedPojoClass, null);
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
assertThat(rs.all()).hasSize(20);
}
- @TestTemplate
+ @Test
void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws
Exception {
final Class<? extends Pojo> annotatedPojoClass =
annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
writePojos(annotatedPojoClass, KEYSPACE);
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
assertThat(rs.all()).hasSize(20);
}
@@ -658,7 +531,11 @@ class CassandraConnectorITCase
throws Exception {
final Constructor<T> pojoConstructor =
getPojoConstructor(annotatedPojoClass);
CassandraPojoSink<T> sink =
- new CassandraPojoSink<>(annotatedPojoClass, builderForWriting,
null, keyspace);
+ new CassandraPojoSink<>(
+ annotatedPojoClass,
+ cassandraTestEnvironment.getBuilderForWriting(),
+ null,
+ keyspace);
try {
sink.open(new Configuration());
for (int x = 0; x < 20; x++) {
@@ -669,7 +546,7 @@ class CassandraConnectorITCase
}
}
- @TestTemplate
+ @Test
void testCassandraTableSink() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
@@ -682,7 +559,8 @@ class CassandraConnectorITCase
.registerTableSinkInternal(
"cassandraTable",
new CassandraAppendTableSink(
- builderForWriting,
injectTableName(INSERT_DATA_QUERY))
+
cassandraTestEnvironment.getBuilderForWriting(),
+ injectTableName(INSERT_DATA_QUERY))
.configure(
new String[] {"f0", "f1", "f2"},
new TypeInformation[] {
@@ -691,7 +569,9 @@ class CassandraConnectorITCase
tEnv.sqlQuery("select * from
testFlinkTable").executeInsert("cassandraTable").await();
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
// validate that all input was correctly written to Cassandra
List<Row> input = new ArrayList<>(rowCollection);
@@ -710,23 +590,16 @@ class CassandraConnectorITCase
private static int retrialsCount = 0;
- @TestTemplate
- void testRetrial() {
- annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
- if (retrialsCount < 2) {
- retrialsCount++;
- throw new NoHostAvailableException(new HashMap<>());
- }
- }
-
- @TestTemplate
+ @Test
void testCassandraBatchPojoFormat() throws Exception {
final Class<? extends Pojo> annotatedPojoClass =
annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
final List<? extends Pojo> pojos =
writePojosWithOutputFormat(annotatedPojoClass);
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
assertThat(rs.all()).hasSize(20);
final List<? extends Pojo> result =
readPojosWithInputFormat(annotatedPojoClass);
@@ -739,11 +612,12 @@ class CassandraConnectorITCase
.isEqualTo(pojos);
}
- @TestTemplate
+ @Test
void testCassandraBatchTupleFormat() throws Exception {
OutputFormat<Tuple3<String, Integer, Integer>> sink =
new CassandraTupleOutputFormat<>(
- injectTableName(INSERT_DATA_QUERY), builderForWriting);
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
try {
sink.configure(new Configuration());
sink.open(0, 1);
@@ -756,7 +630,8 @@ class CassandraConnectorITCase
sink =
new CassandraTupleOutputFormat<>(
- injectTableName(INSERT_DATA_QUERY), builderForWriting);
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
try {
sink.configure(new Configuration());
sink.open(0, 1);
@@ -768,7 +643,9 @@ class CassandraConnectorITCase
}
InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source =
- new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY),
builderForReading);
+ new CassandraInputFormat<>(
+ injectTableName(SELECT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForReading());
List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
try {
source.configure(new Configuration());
@@ -783,10 +660,12 @@ class CassandraConnectorITCase
assertThat(result).hasSize(20);
}
- @TestTemplate
+ @Test
void testCassandraBatchRowFormat() throws Exception {
OutputFormat<Row> sink =
- new
CassandraRowOutputFormat(injectTableName(INSERT_DATA_QUERY), builderForWriting);
+ new CassandraRowOutputFormat(
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
try {
sink.configure(new Configuration());
sink.open(0, 1);
@@ -798,12 +677,14 @@ class CassandraConnectorITCase
sink.close();
}
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSameSizeAs(rowCollection);
}
- @TestTemplate
+ @Test
void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws
Exception {
Class<scala.Tuple1<String>> c =
(Class<scala.Tuple1<String>>) new
scala.Tuple1<>("hello").getClass();
@@ -833,11 +714,12 @@ class CassandraConnectorITCase
assertThat(sinkBuilder).isInstanceOf(CassandraSink.CassandraScalaProductSinkBuilder.class);
}
- @TestTemplate
+ @Test
void testCassandraScalaTupleAtLeastSink() throws Exception {
CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink
=
new CassandraScalaProductSink<>(
- injectTableName(INSERT_DATA_QUERY), builderForWriting);
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting());
List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection =
new ArrayList<>(20);
for (int i = 0; i < 20; i++) {
@@ -852,7 +734,9 @@ class CassandraConnectorITCase
sink.close();
}
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSameSizeAs(scalaTupleCollection);
@@ -866,13 +750,15 @@ class CassandraConnectorITCase
assertThat(scalaTupleCollection).isEmpty();
}
- @TestTemplate
+ @Test
void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
CassandraSinkBaseConfig config =
CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink
=
new CassandraScalaProductSink<>(
- injectTableName(INSERT_DATA_QUERY), builderForWriting,
config);
+ injectTableName(INSERT_DATA_QUERY),
+ cassandraTestEnvironment.getBuilderForWriting(),
+ config);
String id = UUID.randomUUID().toString();
Integer counter = 1;
@@ -892,7 +778,9 @@ class CassandraConnectorITCase
sink.close();
}
- ResultSet rs =
session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
+ ResultSet rs =
+ cassandraTestEnvironment.executeRequestWithTimeout(
+ injectTableName(SELECT_DATA_QUERY));
List<com.datastax.driver.core.Row> rows = rs.all();
assertThat(rows).hasSize(1);
// Since nulls are ignored, we should be reading one complete record
@@ -905,8 +793,4 @@ class CassandraConnectorITCase
.isEqualTo(new scala.Tuple3<>(id, counter, batchId));
}
}
-
- private static Statement requestWithTimeout(String query) {
- return new
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
- }
}