zentol commented on a change in pull request #18544:
URL: https://github.com/apache/flink/pull/18544#discussion_r797624076



##########
File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
##########
@@ -206,6 +209,79 @@ protected Cluster buildCluster(Cluster.Builder builder) {
         }
     }
 
+    private static String setTableNameInPojo(Class<?> claz, String testName) 
throws Exception {
+        Table clazTableAnnotation = claz.getAnnotation(Table.class);
+        final String newName = clazTableAnnotation.name() + testName;
+        // BEWARE it is JDK 8 specific
+        final Method annotationDataMethod = 
Class.class.getDeclaredMethod("annotationData", null);

Review comment:
       Have you looked into using libraries to achieve this? Like byte-buddy, 
asm etc.
   
   An UNTESTED prototype:
   ```
       @Test
       public void testCassandraPojoAtLeastOnceSink() throws Exception {
           final String someName = "test";
           session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
someName));
   
           final Class<? extends Pojo> load =
                   new ByteBuddy()
                           .subclass(Pojo.class)
                           .annotateType(createTableAnnotation(KEYSPACE, 
someName))
                           .make()
                           .load(getClass().getClassLoader())
                           .getLoaded();
   
           writeInstances(load);
   
           ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, someName));
           Assert.assertEquals(20, rs.all().size());
       }
   
       private Table createTableAnnotation(@Nullable String keySpace, String 
tableName) {
           return new Table() {
               @Override
               public Class<? extends Annotation> annotationType() {
                   return Table.class;
               }
   
               @Override
               public String keyspace() {
                   return keySpace;
               }
   
               @Override
               public String name() {
                   return tableName;
               }
   
               @Override
               public boolean caseSensitiveKeyspace() {
                   return false;
               }
   
               @Override
               public boolean caseSensitiveTable() {
                   return false;
               }
   
               @Override
               public String writeConsistency() {
                   return "";
               }
   
               @Override
               public String readConsistency() {
                   return "";
               }
           };
       }
   
       private <T> void writeInstances(Class<T> clazz) throws Exception {
           final Constructor<T> constructor =
                   clazz.getDeclaredConstructor(String.class, Integer.class, 
Integer.class);
           CassandraPojoSink<T> sink = new CassandraPojoSink<>(clazz, 
builderForWriting);
           try {
               sink.open(new Configuration());
               for (int x = 0; x < 20; x++) {
                   
sink.send(constructor.newInstance(UUID.randomUUID().toString(), x, 0));
               }
           } finally {
               sink.close();
           }
       }
   ```

##########
File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
##########
@@ -21,11 +21,9 @@
 import com.datastax.driver.mapping.annotations.Table;
 
 /** Example of Cassandra Annotated POJO class for use with {@link 
CassandraPojoInputFormat}. */
-@Table(name = CustomCassandraAnnotatedPojo.TABLE_NAME, keyspace = "flink")
+@Table(name = "batches", keyspace = "flink")

Review comment:
       we should be able to just remove this (and other) annotations because we 
override them anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to