This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 26fb7a269f5fe3fc5dd0d52d88afb2915e452d1b Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Jan 20 16:49:58 2022 +0100 [FLINK-25674][connectors][cassandra][tests] Add drop tables to be idempotent in case of retrials and the related test --- .../cassandra/CassandraConnectorITCase.java | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 3b4b314..d94bec9 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -57,6 +57,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -72,6 +73,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.UUID; @@ -143,6 +145,8 @@ public class CassandraConnectorITCase "CREATE TABLE flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String DROP_TABLE_QUERY = + "DROP TABLE IF EXISTS flink." + TABLE_NAME_VARIABLE + " ;"; private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE @@ -215,6 +219,18 @@ public class CassandraConnectorITCase session.execute(injectTableName(CREATE_TABLE_QUERY)); } + @After + public void dropTables() { + // need to drop tables in case of retrials. Need to drop all the tables + // that are created in test because this method is executed with every test + session.execute( + DROP_TABLE_QUERY.replace( + TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); + session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); + session.execute( + DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace")); + } + @AfterClass public static void closeCassandra() { if (session != null) { @@ -507,6 +523,19 @@ public class CassandraConnectorITCase "The input data was not completely written to Cassandra", input.isEmpty()); } + private static int retrialsCount = 0; + + @Test + public void testRetrialAndDropTables() { + session.execute( + CREATE_TABLE_QUERY.replace( + TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); + if (retrialsCount < 2) { + retrialsCount++; + throw new NoHostAvailableException(new HashMap<>()); + } + } + @Test public void testCassandraBatchPojoFormat() throws Exception {
