This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8488368b86a99a064446ca74e775b67ffff0b94a
Author: Etienne Chauchot <[email protected]>
AuthorDate: Fri Jan 28 13:05:26 2022 +0100

    [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos
---
 .../cassandra/CustomCassandraAnnotatedPojo.java    |  70 -----
 .../cassandra/example/BatchPojoExample.java        |  17 +-
 .../connectors/cassandra/example/Pojo.java}        |  13 +-
 .../cassandra/CassandraConnectorITCase.java        | 333 ++++++++++++---------
 .../flink/streaming/connectors/cassandra/Pojo.java |   7 +-
 5 files changed, 215 insertions(+), 225 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
deleted file mode 100644
index ad21265..0000000
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-/** Example of Cassandra Annotated POJO class for use with {@link 
CassandraPojoInputFormat}. */
-@Table(name = CustomCassandraAnnotatedPojo.TABLE_NAME, keyspace = "flink")
-public class CustomCassandraAnnotatedPojo {
-
-    public static final String TABLE_NAME = "batches";
-
-    @Column(name = "id")
-    private String id;
-
-    @Column(name = "counter")
-    private Integer counter;
-
-    @Column(name = "batch_id")
-    private Integer batchId;
-
-    /** Necessary for the driver's mapper instantiation. */
-    public CustomCassandraAnnotatedPojo() {}
-
-    public CustomCassandraAnnotatedPojo(String id, Integer counter, Integer 
batchId) {
-        this.id = id;
-        this.counter = counter;
-        this.batchId = batchId;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public Integer getCounter() {
-        return counter;
-    }
-
-    public void setCounter(Integer counter) {
-        this.counter = counter;
-    }
-
-    public Integer getBatchId() {
-        return batchId;
-    }
-
-    public void setBatchId(Integer batchId) {
-        this.batchId = batchId;
-    }
-}
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index fb6733e..764c001 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
-import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
 import com.datastax.driver.core.Cluster;
@@ -50,16 +49,12 @@ public class BatchPojoExample {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 
-        List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos =
+        List<Pojo> customCassandraAnnotatedPojos =
                 IntStream.range(0, 20)
-                        .mapToObj(
-                                x ->
-                                        new CustomCassandraAnnotatedPojo(
-                                                UUID.randomUUID().toString(), 
x, 0))
+                        .mapToObj(x -> new Pojo(UUID.randomUUID().toString(), 
x, 0))
                         .collect(Collectors.toList());
 
-        DataSet<CustomCassandraAnnotatedPojo> dataSet =
-                env.fromCollection(customCassandraAnnotatedPojos);
+        DataSet<Pojo> dataSet = 
env.fromCollection(customCassandraAnnotatedPojos);
 
         ClusterBuilder clusterBuilder =
                 new ClusterBuilder() {
@@ -74,7 +69,7 @@ public class BatchPojoExample {
         dataSet.output(
                 new CassandraPojoOutputFormat<>(
                         clusterBuilder,
-                        CustomCassandraAnnotatedPojo.class,
+                        Pojo.class,
                         () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)}));
 
         env.execute("Write");
@@ -82,12 +77,12 @@ public class BatchPojoExample {
         /*
          *     This is for the purpose of showing an example of creating a 
DataSet using CassandraPojoInputFormat.
          */
-        DataSet<CustomCassandraAnnotatedPojo> inputDS =
+        DataSet<Pojo> inputDS =
                 env.createInput(
                         new CassandraPojoInputFormat<>(
                                 SELECT_QUERY,
                                 clusterBuilder,
-                                CustomCassandraAnnotatedPojo.class,
+                                Pojo.class,
                                 () ->
                                         new Mapper.Option[] {
                                             
Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
similarity index 84%
rename from 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
rename to 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
index aa74310..559f107 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.cassandra;
+package org.apache.flink.batch.connectors.cassandra.example;
 
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
-/** Test Pojo with DataStax annotations used (no keyspace). */
-@Table(name = "testPojoNoAnnotatedKeyspace")
-public class PojoNoAnnotatedKeyspace implements Serializable {
+/** Test Pojo with DataStax annotations used. */
+@Table(keyspace = "flink", name = "batches")
+public class Pojo implements Serializable {
 
     private static final long serialVersionUID = 1038054554690916991L;
 
@@ -37,7 +37,10 @@ public class PojoNoAnnotatedKeyspace implements Serializable 
{
     @Column(name = "batch_id")
     private int batchID;
 
-    public PojoNoAnnotatedKeyspace(String id, int counter, int batchID) {
+    // required for deserialization
+    public Pojo() {}
+
+    public Pojo(String id, int counter, int batchID) {
         this.id = id;
         this.counter = counter;
         this.batchID = batchID;
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 4c24942..09212c5 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
-import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -58,6 +57,10 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.annotations.Table;
+import net.bytebuddy.ByteBuddy;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -74,19 +77,18 @@ import 
org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.images.builder.Transferable;
 
 import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
@@ -108,8 +110,6 @@ public class CassandraConnectorITCase
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
     private static final Slf4jLogConsumer LOG_CONSUMER = new 
Slf4jLogConsumer(LOG);
-    private static final String TABLE_POJO = "test";
-    private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = 
"testPojoNoAnnotatedKeyspace";
 
     @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
@@ -206,6 +206,61 @@ public class CassandraConnectorITCase
         }
     }
 
+    private static Class<? extends Pojo> annotatePojoWithTable(String 
keyspace, String tableName) {
+        return new ByteBuddy()
+                .redefine(Pojo.class)
+                .name("org.apache.flink.streaming.connectors.cassandra.Pojo" + 
tableName)
+                .annotateType(createTableAnnotation(keyspace, tableName))
+                .make()
+                .load(Pojo.class.getClassLoader())
+                .getLoaded();
+    }
+
+    @NotNull
+    private static Table createTableAnnotation(String keyspace, String 
tableName) {
+        return new Table() {
+
+            @Override
+            public String keyspace() {
+                return keyspace;
+            }
+
+            @Override
+            public String name() {
+                return tableName;
+            }
+
+            @Override
+            public boolean caseSensitiveKeyspace() {
+                return false;
+            }
+
+            @Override
+            public boolean caseSensitiveTable() {
+                return false;
+            }
+
+            @Override
+            public String writeConsistency() {
+                return "";
+            }
+
+            @Override
+            public String readConsistency() {
+                return "";
+            }
+
+            @Override
+            public Class<? extends Annotation> annotationType() {
+                return Table.class;
+            }
+        };
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utility methods
+    // ------------------------------------------------------------------------
+
     public static CassandraContainer createCassandraContainer() {
         CassandraContainer cassandra = new 
CassandraContainer(DockerImageVersions.CASSANDRA_3);
         cassandra.withJmxReporting(false);
@@ -213,6 +268,87 @@ public class CassandraConnectorITCase
         return cassandra;
     }
 
+    private static void raiseCassandraRequestsTimeouts() {
+        try {
+            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+            CASSANDRA_CONTAINER.copyFileFromContainer(
+                    "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
+            String configuration =
+                    new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
+            String patchedConfiguration =
+                    configuration
+                            .replaceAll(
+                                    "request_timeout_in_ms: [0-9]+", 
"request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "read_request_timeout_in_ms: [0-9]+",
+                                    "read_request_timeout_in_ms: 15000")
+                            .replaceAll(
+                                    "write_request_timeout_in_ms: [0-9]+",
+                                    "write_request_timeout_in_ms: 6000");
+            CASSANDRA_CONTAINER.copyFileToContainer(
+                    
Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
+                    "/etc/cassandra/cassandra.yaml");
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to open Cassandra configuration 
file ", e);
+        }
+    }
+
+    private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) {
+        final CassandraPojoInputFormat<T> source =
+                new CassandraPojoInputFormat<>(
+                        injectTableName(SELECT_DATA_QUERY), builderForReading, 
annotatedPojoClass);
+        List<T> result = new ArrayList<>();
+
+        try {
+            source.configure(new Configuration());
+            source.open(null);
+            while (!source.reachedEnd()) {
+                T temp = source.nextRecord(null);
+                result.add(temp);
+            }
+        } finally {
+            source.close();
+        }
+        return result;
+    }
+
+    private <T> List<T> writePojosWithOutputFormat(Class<T> 
annotatedPojoClass) throws Exception {
+        final CassandraPojoOutputFormat<T> sink =
+                new CassandraPojoOutputFormat<>(
+                        builderForWriting,
+                        annotatedPojoClass,
+                        () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
+
+        final Constructor<T> pojoConstructor = 
getPojoConstructor(annotatedPojoClass);
+        List<T> pojos = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            
pojos.add(pojoConstructor.newInstance(UUID.randomUUID().toString(), i, 0));
+        }
+        try {
+            sink.configure(new Configuration());
+            sink.open(0, 1);
+            for (T pojo : pojos) {
+                sink.writeRecord(pojo);
+            }
+        } finally {
+            sink.close();
+        }
+        return pojos;
+    }
+
+    private <T> Constructor<T> getPojoConstructor(Class<T> annotatedPojoClass)
+            throws NoSuchMethodException {
+        return annotatedPojoClass.getConstructor(String.class, Integer.TYPE, 
Integer.TYPE);
+    }
+
+    private String injectTableName(String target) {
+        return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + 
tableID);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Tests initialization
+    // ------------------------------------------------------------------------
+
     @BeforeClass
     public static void startAndInitializeCassandra() {
         raiseCassandraRequestsTimeouts();
@@ -249,46 +385,6 @@ public class CassandraConnectorITCase
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + "initial"));
     }
 
-    private static void raiseCassandraRequestsTimeouts() {
-        try {
-            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
-            CASSANDRA_CONTAINER.copyFileFromContainer(
-                    "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
-            String configuration =
-                    new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
-            String patchedConfiguration =
-                    configuration
-                            .replaceAll(
-                                    "request_timeout_in_ms: [0-9]+", 
"request_timeout_in_ms: 30000")
-                            .replaceAll(
-                                    "read_request_timeout_in_ms: [0-9]+",
-                                    "read_request_timeout_in_ms: 15000")
-                            .replaceAll(
-                                    "write_request_timeout_in_ms: [0-9]+",
-                                    "write_request_timeout_in_ms: 6000");
-            CASSANDRA_CONTAINER.copyFileToContainer(
-                    
Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
-                    "/etc/cassandra/cassandra.yaml");
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to open Cassandra configuration 
file ", e);
-        }
-    }
-
-    @Test
-    public void testRaiseCassandraRequestsTimeouts() throws IOException {
-        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
-        // do not change the container conf twice, just assert that it was 
indeed changed in the
-        // container
-        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
-        CASSANDRA_CONTAINER.copyFileFromContainer(
-                "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
-        final String configuration =
-                new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
-        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
-        assertTrue(configuration.contains("read_request_timeout_in_ms: 
15000"));
-        assertTrue(configuration.contains("write_request_timeout_in_ms: 
6000"));
-    }
-
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);
@@ -299,12 +395,8 @@ public class CassandraConnectorITCase
     public void dropTables() {
         // need to drop tables in case of retrials. Need to drop all the tables
         // that are created in test because this method is executed with every 
test
-        session.execute(
-                DROP_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
-        session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO));
-        session.execute(
-                DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+        session.execute(DROP_KEYSPACE_QUERY);
+        session.execute(CREATE_KEYSPACE_QUERY);
     }
 
     @AfterClass
@@ -319,6 +411,34 @@ public class CassandraConnectorITCase
     }
 
     // ------------------------------------------------------------------------
+    //  Technical Tests
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testAnnotatePojoWithTable() {
+        final String tableName = TABLE_NAME_PREFIX + tableID;
+
+        final Class<? extends Pojo> annotatedPojoClass = 
annotatePojoWithTable(KEYSPACE, tableName);
+        final Table pojoTableAnnotation = 
annotatedPojoClass.getAnnotation(Table.class);
+        assertTrue(pojoTableAnnotation.name().contains(tableName));
+    }
+
+    @Test
+    public void testRaiseCassandraRequestsTimeouts() throws IOException {
+        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
+        // do not change the container conf twice, just assert that it was 
indeed changed in the
+        // container
+        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+        CASSANDRA_CONTAINER.copyFileFromContainer(
+                "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
+        final String configuration =
+                new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
+        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
+        assertTrue(configuration.contains("read_request_timeout_in_ms: 
15000"));
+        assertTrue(configuration.contains("write_request_timeout_in_ms: 
6000"));
+    }
+
+    // ------------------------------------------------------------------------
     //  Exactly-once Tests
     // ------------------------------------------------------------------------
 
@@ -520,43 +640,36 @@ public class CassandraConnectorITCase
 
     @Test
     public void testCassandraPojoAtLeastOnceSink() throws Exception {
-        session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO));
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, null);
 
-        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, 
builderForWriting);
-        try {
-            sink.open(new Configuration());
-            for (int x = 0; x < 20; x++) {
-                sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
-            }
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
         Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
     public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws 
Exception {
-        session.execute(
-                CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, KEYSPACE);
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+        Assert.assertEquals(20, rs.all().size());
+    }
 
-        CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
-                new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, 
builderForWriting, KEYSPACE);
+    private <T> void writePojos(Class<T> annotatedPojoClass, @Nullable String 
keyspace)
+            throws Exception {
+        final Constructor<T> pojoConstructor = 
getPojoConstructor(annotatedPojoClass);
+        CassandraPojoSink<T> sink =
+                new CassandraPojoSink<>(annotatedPojoClass, builderForWriting, 
null, keyspace);
         try {
             sink.open(new Configuration());
             for (int x = 0; x < 20; x++) {
-                sink.send(new 
PojoNoAnnotatedKeyspace(UUID.randomUUID().toString(), x, 0));
+                
sink.send(pojoConstructor.newInstance(UUID.randomUUID().toString(), x, 0));
             }
-
         } finally {
             sink.close();
         }
-        ResultSet rs =
-                session.execute(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, 
TABLE_POJO_NO_ANNOTATED_KEYSPACE));
-        Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
@@ -603,9 +716,9 @@ public class CassandraConnectorITCase
 
     @Test
     public void testRetrialAndDropTables() {
-        session.execute(
-                CREATE_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
+        // should not fail with table exists upon retrial
+        // as @After method that truncate the keyspace is called upon retrials.
+        annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
         if (retrialsCount < 2) {
             retrialsCount++;
             throw new NoHostAvailableException(new HashMap<>());
@@ -615,64 +728,16 @@ public class CassandraConnectorITCase
     @Test
     public void testCassandraBatchPojoFormat() throws Exception {
 
-        session.execute(
-                CREATE_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
-
-        OutputFormat<CustomCassandraAnnotatedPojo> sink =
-                new CassandraPojoOutputFormat<>(
-                        builderForWriting,
-                        CustomCassandraAnnotatedPojo.class,
-                        () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
 
-        List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos =
-                IntStream.range(0, 20)
-                        .mapToObj(
-                                x ->
-                                        new CustomCassandraAnnotatedPojo(
-                                                UUID.randomUUID().toString(), 
x, 0))
-                        .collect(Collectors.toList());
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (CustomCassandraAnnotatedPojo customCassandraAnnotatedPojo :
-                    customCassandraAnnotatedPojos) {
-                sink.writeRecord(customCassandraAnnotatedPojo);
-            }
-        } finally {
-            sink.close();
-        }
-        ResultSet rs =
-                session.execute(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME));
+        final List<? extends Pojo> pojos = 
writePojosWithOutputFormat(annotatedPojoClass);
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
         Assert.assertEquals(20, rs.all().size());
 
-        InputFormat<CustomCassandraAnnotatedPojo, InputSplit> source =
-                new CassandraPojoInputFormat<>(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, 
CustomCassandraAnnotatedPojo.TABLE_NAME),
-                        builderForReading,
-                        CustomCassandraAnnotatedPojo.class);
-        List<CustomCassandraAnnotatedPojo> result = new ArrayList<>();
-
-        try {
-            source.configure(new Configuration());
-            source.open(null);
-            while (!source.reachedEnd()) {
-                CustomCassandraAnnotatedPojo temp = source.nextRecord(null);
-                result.add(temp);
-            }
-        } finally {
-            source.close();
-        }
-
+        final List<? extends Pojo> result = 
readPojosWithInputFormat(annotatedPojoClass);
         Assert.assertEquals(20, result.size());
-        
result.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
-        customCassandraAnnotatedPojos.sort(
-                
Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
-
-        assertThat(result, 
samePropertyValuesAs(customCassandraAnnotatedPojos));
+        assertThat(result, samePropertyValuesAs(pojos));
     }
 
     @Test
@@ -738,10 +803,6 @@ public class CassandraConnectorITCase
         Assert.assertEquals(rowCollection.size(), rows.size());
     }
 
-    private String injectTableName(String target) {
-        return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + 
tableID);
-    }
-
     @Test
     public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() 
throws Exception {
         Class<scala.Tuple1<String>> c =
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
index ce2c213..2efde66 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
@@ -18,12 +18,10 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
-/** Test Pojo with DataStax annotations used. */
-@Table(keyspace = "flink", name = "test")
+/** Test Pojo with DataStax annotations created dynamically. */
 public class Pojo implements Serializable {
 
     private static final long serialVersionUID = 1038054554690916991L;
@@ -37,6 +35,9 @@ public class Pojo implements Serializable {
     @Column(name = "batch_id")
     private int batchID;
 
+    // required for deserialization
+    public Pojo() {}
+
     public Pojo(String id, int counter, int batchID) {
         this.id = id;
         this.counter = counter;

Reply via email to