This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69075ed4da9d97e5f2b3db652c5ee9d7ad20eb26
Author: Etienne Chauchot <[email protected]>
AuthorDate: Tue Jan 18 12:22:06 2022 +0100

    [FLINK-25674][connectors][cassandra][tests] use constants instead of string 
literals to avoid copy/paste
---
 .../cassandra/CassandraConnectorITCase.java        | 75 +++++++++++++++-------
 1 file changed, 52 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index d94bec9..e4d83f3 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -103,6 +103,8 @@ public class CassandraConnectorITCase
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
+    private static final String TABLE_POJO = "test";
+    private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = 
"testPojoNoAnnotatedKeyspace";
 
     @Rule public final RetryRule retryRule = new RetryRule();
 
@@ -138,21 +140,43 @@ public class CassandraConnectorITCase
 
     private static final String TABLE_NAME_PREFIX = "flink_";
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
+    private static final String KEYSPACE = "flink";
+    private static final String TUPLE_ID_FIELD = "id";
+    private static final String TUPLE_COUNTER_FIELD = "counter";
+    private static final String TUPLE_BATCHID_FIELD = "batch_id";
     private static final String CREATE_KEYSPACE_QUERY =
-            "CREATE KEYSPACE flink WITH replication= 
{'class':'SimpleStrategy', 'replication_factor':1};";
-    private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS 
flink ;";
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
+    private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS 
" + KEYSPACE + " ;";
+    private static final String DROP_TABLE_QUERY =
+            "DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE_NAME_VARIABLE + " 
;";
     private static final String CREATE_TABLE_QUERY =
-            "CREATE TABLE flink."
+            "CREATE TABLE "
+                    + KEYSPACE
+                    + "."
                     + TABLE_NAME_VARIABLE
-                    + " (id text PRIMARY KEY, counter int, batch_id int);";
-    private static final String DROP_TABLE_QUERY =
-            "DROP TABLE IF EXISTS flink." + TABLE_NAME_VARIABLE + " ;";
+                    + " ("
+                    + TUPLE_ID_FIELD
+                    + " text PRIMARY KEY, "
+                    + TUPLE_COUNTER_FIELD
+                    + " int, "
+                    + TUPLE_BATCHID_FIELD
+                    + " int);";
     private static final String INSERT_DATA_QUERY =
-            "INSERT INTO flink."
+            "INSERT INTO "
+                    + KEYSPACE
+                    + "."
                     + TABLE_NAME_VARIABLE
-                    + " (id, counter, batch_id) VALUES (?, ?, ?)";
+                    + " ("
+                    + TUPLE_ID_FIELD
+                    + ", "
+                    + TUPLE_COUNTER_FIELD
+                    + ", "
+                    + TUPLE_BATCHID_FIELD
+                    + ") VALUES (?, ?, ?)";
     private static final String SELECT_DATA_QUERY =
-            "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';';
+            "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_VARIABLE + ';';
 
     private static final Random random = new Random();
     private int tableID;
@@ -226,9 +250,9 @@ public class CassandraConnectorITCase
         session.execute(
                 DROP_TABLE_QUERY.replace(
                         TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
-        session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
+        session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO));
         session.execute(
-                DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"testPojoNoAnnotatedKeyspace"));
+                DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
     }
 
     @AfterClass
@@ -278,7 +302,7 @@ public class CassandraConnectorITCase
         }
 
         for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt("counter")));
+            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
         }
         Assert.assertTrue(
                 "The following ID's were not found in the ResultSet: " + 
list.toString(),
@@ -296,7 +320,7 @@ public class CassandraConnectorITCase
         }
 
         for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt("counter")));
+            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
         }
         Assert.assertTrue(
                 "The following ID's were not found in the ResultSet: " + 
list.toString(),
@@ -317,7 +341,7 @@ public class CassandraConnectorITCase
         }
 
         for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt("counter")));
+            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
         }
         Assert.assertTrue(
                 "The following ID's were not found in the ResultSet: " + 
list.toString(),
@@ -344,7 +368,7 @@ public class CassandraConnectorITCase
         ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
 
         for (com.datastax.driver.core.Row s : result) {
-            actual.add(s.getInt("counter"));
+            actual.add(s.getInt(TUPLE_COUNTER_FIELD));
         }
 
         Collections.sort(actual);
@@ -444,7 +468,7 @@ public class CassandraConnectorITCase
 
     @Test
     public void testCassandraPojoAtLeastOnceSink() throws Exception {
-        session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test"));
+        session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO));
 
         CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, 
builderForWriting);
         try {
@@ -456,17 +480,17 @@ public class CassandraConnectorITCase
             sink.close();
         }
 
-        ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
+        ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
         Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
     public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws 
Exception {
         session.execute(
-                CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"testPojoNoAnnotatedKeyspace"));
+                CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
 
         CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
-                new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, 
builderForWriting, "flink");
+                new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, 
builderForWriting, KEYSPACE);
         try {
             sink.open(new Configuration());
             for (int x = 0; x < 20; x++) {
@@ -479,7 +503,7 @@ public class CassandraConnectorITCase
         ResultSet rs =
                 session.execute(
                         SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, 
"testPojoNoAnnotatedKeyspace"));
+                                TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
         Assert.assertEquals(20, rs.all().size());
     }
 
@@ -574,7 +598,8 @@ public class CassandraConnectorITCase
 
         InputFormat<CustomCassandraAnnotatedPojo, InputSplit> source =
                 new CassandraPojoInputFormat<>(
-                        SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"),
+                        SELECT_DATA_QUERY.replace(
+                                TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME),
                         builderForReading,
                         CustomCassandraAnnotatedPojo.class);
         List<CustomCassandraAnnotatedPojo> result = new ArrayList<>();
@@ -721,7 +746,9 @@ public class CassandraConnectorITCase
         for (com.datastax.driver.core.Row row : rows) {
             scalaTupleCollection.remove(
                     new scala.Tuple3<>(
-                            row.getString("id"), row.getInt("counter"), 
row.getInt("batch_id")));
+                            row.getString(TUPLE_ID_FIELD),
+                            row.getInt(TUPLE_COUNTER_FIELD),
+                            row.getInt(TUPLE_BATCHID_FIELD)));
         }
         Assert.assertEquals(0, scalaTupleCollection.size());
     }
@@ -760,7 +787,9 @@ public class CassandraConnectorITCase
             Assert.assertEquals(
                     new scala.Tuple3<>(id, counter, batchId),
                     new scala.Tuple3<>(
-                            row.getString("id"), row.getInt("counter"), 
row.getInt("batch_id")));
+                            row.getString(TUPLE_ID_FIELD),
+                            row.getInt(TUPLE_COUNTER_FIELD),
+                            row.getInt(TUPLE_BATCHID_FIELD)));
         }
     }
 }

Reply via email to