This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26fb7a269f5fe3fc5dd0d52d88afb2915e452d1b
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Jan 20 16:49:58 2022 +0100

    [FLINK-25674][connectors][cassandra][tests] Add drop tables to be 
idempotent in case of retrials and the related test
---
 .../cassandra/CassandraConnectorITCase.java        | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 3b4b314..d94bec9 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -57,6 +57,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -72,6 +73,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -143,6 +145,8 @@ public class CassandraConnectorITCase
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
                     + " (id text PRIMARY KEY, counter int, batch_id int);";
+    private static final String DROP_TABLE_QUERY =
+            "DROP TABLE IF EXISTS flink." + TABLE_NAME_VARIABLE + " ;";
     private static final String INSERT_DATA_QUERY =
             "INSERT INTO flink."
                     + TABLE_NAME_VARIABLE
@@ -215,6 +219,18 @@ public class CassandraConnectorITCase
         session.execute(injectTableName(CREATE_TABLE_QUERY));
     }
 
+    @After
+    public void dropTables() {
+        // need to drop tables in case of retrials. Need to drop all the tables
+        // that are created in test because this method is executed with every 
test
+        session.execute(
+                DROP_TABLE_QUERY.replace(
+                        TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
+        session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
+        session.execute(
+                DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"testPojoNoAnnotatedKeyspace"));
+    }
+
     @AfterClass
     public static void closeCassandra() {
         if (session != null) {
@@ -507,6 +523,19 @@ public class CassandraConnectorITCase
                 "The input data was not completely written to Cassandra", 
input.isEmpty());
     }
 
+    private static int retrialsCount = 0;
+
+    @Test
+    public void testRetrialAndDropTables() {
+        session.execute(
+                CREATE_TABLE_QUERY.replace(
+                        TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
+        if (retrialsCount < 2) {
+            retrialsCount++;
+            throw new NoHostAvailableException(new HashMap<>());
+        }
+    }
+
     @Test
     public void testCassandraBatchPojoFormat() throws Exception {
 

Reply via email to