echauchot commented on a change in pull request #18544:
URL: https://github.com/apache/flink/pull/18544#discussion_r810097469
##########
File path:
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
##########
@@ -520,43 +575,35 @@ public void testCassandraRowAtLeastOnceSink() throws
Exception {
@Test
public void testCassandraPojoAtLeastOnceSink() throws Exception {
- session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE,
TABLE_POJO));
-
- CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class,
builderForWriting);
- try {
- sink.open(new Configuration());
- for (int x = 0; x < 20; x++) {
- sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
- }
- } finally {
- sink.close();
- }
+ final Class<? extends Pojo> annotatedPojoClass =
+ annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
+ writePojos(annotatedPojoClass, null);
- ResultSet rs =
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
}
@Test
public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws
Exception {
- session.execute(
- CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE,
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+ final Class<? extends Pojo> annotatedPojoClass =
+ annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
+ writePojos(annotatedPojoClass, KEYSPACE);
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+ Assert.assertEquals(20, rs.all().size());
+ }
- CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
- new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class,
builderForWriting, KEYSPACE);
+ private <T> void writePojos(Class<T> annotatedPojoClass, String keyspace)
throws Exception {
Review comment:
thx, forgot
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]