echauchot commented on a change in pull request #18544:
URL: https://github.com/apache/flink/pull/18544#discussion_r810097469



##########
File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
##########
@@ -520,43 +575,35 @@ public void testCassandraRowAtLeastOnceSink() throws 
Exception {
 
     @Test
     public void testCassandraPojoAtLeastOnceSink() throws Exception {
-        session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO));
-
-        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, 
builderForWriting);
-        try {
-            sink.open(new Configuration());
-            for (int x = 0; x < 20; x++) {
-                sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
-            }
-        } finally {
-            sink.close();
-        }
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, null);
 
-        ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
         Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
     public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws 
Exception {
-        session.execute(
-                CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, KEYSPACE);
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+        Assert.assertEquals(20, rs.all().size());
+    }
 
-        CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
-                new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, 
builderForWriting, KEYSPACE);
+    private <T> void writePojos(Class<T> annotatedPojoClass, String keyspace) 
throws Exception {

Review comment:
       thx, forgot




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to