This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8488368b86a99a064446ca74e775b67ffff0b94a Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Jan 28 13:05:26 2022 +0100 [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos --- .../cassandra/CustomCassandraAnnotatedPojo.java | 70 ----- .../cassandra/example/BatchPojoExample.java | 17 +- .../connectors/cassandra/example/Pojo.java} | 13 +- .../cassandra/CassandraConnectorITCase.java | 333 ++++++++++++--------- .../flink/streaming/connectors/cassandra/Pojo.java | 7 +- 5 files changed, 215 insertions(+), 225 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java deleted file mode 100644 index ad21265..0000000 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.batch.connectors.cassandra; - -import com.datastax.driver.mapping.annotations.Column; -import com.datastax.driver.mapping.annotations.Table; - -/** Example of Cassandra Annotated POJO class for use with {@link CassandraPojoInputFormat}. */ -@Table(name = CustomCassandraAnnotatedPojo.TABLE_NAME, keyspace = "flink") -public class CustomCassandraAnnotatedPojo { - - public static final String TABLE_NAME = "batches"; - - @Column(name = "id") - private String id; - - @Column(name = "counter") - private Integer counter; - - @Column(name = "batch_id") - private Integer batchId; - - /** Necessary for the driver's mapper instantiation. */ - public CustomCassandraAnnotatedPojo() {} - - public CustomCassandraAnnotatedPojo(String id, Integer counter, Integer batchId) { - this.id = id; - this.counter = counter; - this.batchId = batchId; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public Integer getCounter() { - return counter; - } - - public void setCounter(Integer counter) { - this.counter = counter; - } - - public Integer getBatchId() { - return batchId; - } - - public void setBatchId(Integer batchId) { - this.batchId = batchId; - } -} diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index fb6733e..764c001 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -21,7 +21,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat; -import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import com.datastax.driver.core.Cluster; @@ -50,16 +49,12 @@ public class BatchPojoExample { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = + List<Pojo> customCassandraAnnotatedPojos = IntStream.range(0, 20) - .mapToObj( - x -> - new CustomCassandraAnnotatedPojo( - UUID.randomUUID().toString(), x, 0)) + .mapToObj(x -> new Pojo(UUID.randomUUID().toString(), x, 0)) .collect(Collectors.toList()); - DataSet<CustomCassandraAnnotatedPojo> dataSet = - env.fromCollection(customCassandraAnnotatedPojos); + DataSet<Pojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos); ClusterBuilder clusterBuilder = new ClusterBuilder() { @@ -74,7 +69,7 @@ public class BatchPojoExample { dataSet.output( new CassandraPojoOutputFormat<>( clusterBuilder, - CustomCassandraAnnotatedPojo.class, + Pojo.class, () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})); env.execute("Write"); @@ -82,12 +77,12 @@ public class BatchPojoExample { /* * This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat. */ - DataSet<CustomCassandraAnnotatedPojo> inputDS = + DataSet<Pojo> inputDS = env.createInput( new CassandraPojoInputFormat<>( SELECT_QUERY, clusterBuilder, - CustomCassandraAnnotatedPojo.class, + Pojo.class, () -> new Mapper.Option[] { Mapper.Option.consistencyLevel(ConsistencyLevel.ANY) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java similarity index 84% rename from flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java rename to flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java index aa74310..559f107 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.cassandra; +package org.apache.flink.batch.connectors.cassandra.example; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.Table; import java.io.Serializable; -/** Test Pojo with DataStax annotations used (no keyspace). */ -@Table(name = "testPojoNoAnnotatedKeyspace") -public class PojoNoAnnotatedKeyspace implements Serializable { +/** Test Pojo with DataStax annotations used. */ +@Table(keyspace = "flink", name = "batches") +public class Pojo implements Serializable { private static final long serialVersionUID = 1038054554690916991L; @@ -37,7 +37,10 @@ public class PojoNoAnnotatedKeyspace implements Serializable { @Column(name = "batch_id") private int batchID; - public PojoNoAnnotatedKeyspace(String id, int counter, int batchID) { + // required for deserialization + public Pojo() {} + + public Pojo(String id, int counter, int batchID) { this.id = id; this.counter = counter; this.batchID = batchID; diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 4c24942..09212c5 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -36,7 +36,6 @@ import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; -import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.streaming.api.datastream.DataStream; @@ -58,6 +57,10 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.annotations.Table; +import net.bytebuddy.ByteBuddy; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -74,19 +77,18 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.images.builder.Transferable; import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -108,8 +110,6 @@ public class CassandraConnectorITCase private static final long CONNECTION_RETRY_DELAY = 500L; private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); - private static final String TABLE_POJO = "test"; - private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace"; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -206,6 +206,61 @@ public class CassandraConnectorITCase } } + private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, String tableName) { + return new ByteBuddy() + .redefine(Pojo.class) + .name("org.apache.flink.streaming.connectors.cassandra.Pojo" + tableName) + .annotateType(createTableAnnotation(keyspace, tableName)) + .make() + .load(Pojo.class.getClassLoader()) + .getLoaded(); + } + + @NotNull + private static Table createTableAnnotation(String keyspace, String tableName) { + return new Table() { + + @Override + public String keyspace() { + return keyspace; + } + + @Override + public String name() { + return tableName; + } + + @Override + public boolean caseSensitiveKeyspace() { + return false; + } + + @Override + public boolean caseSensitiveTable() { + return false; + } + + @Override + public String writeConsistency() { + return ""; + } + + @Override + public String readConsistency() { + return ""; + } + + @Override + public Class<? extends Annotation> annotationType() { + return Table.class; + } + }; + } + + // ------------------------------------------------------------------------ + // Utility methods + // ------------------------------------------------------------------------ + public static CassandraContainer createCassandraContainer() { CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_3); cassandra.withJmxReporting(false); @@ -213,6 +268,87 @@ public class CassandraConnectorITCase return cassandra; } + private static void raiseCassandraRequestsTimeouts() { + try { + final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath(); + CASSANDRA_CONTAINER.copyFileFromContainer( + "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString()); + String configuration = + new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8); + String patchedConfiguration = + configuration + .replaceAll( + "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000") + .replaceAll( + "read_request_timeout_in_ms: [0-9]+", + "read_request_timeout_in_ms: 15000") + .replaceAll( + "write_request_timeout_in_ms: [0-9]+", + "write_request_timeout_in_ms: 6000"); + CASSANDRA_CONTAINER.copyFileToContainer( + Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)), + "/etc/cassandra/cassandra.yaml"); + } catch (IOException e) { + throw new RuntimeException("Unable to open Cassandra configuration file ", e); + } + } + + private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) { + final CassandraPojoInputFormat<T> source = + new CassandraPojoInputFormat<>( + injectTableName(SELECT_DATA_QUERY), builderForReading, annotatedPojoClass); + List<T> result = new ArrayList<>(); + + try { + source.configure(new Configuration()); + source.open(null); + while (!source.reachedEnd()) { + T temp = source.nextRecord(null); + result.add(temp); + } + } finally { + source.close(); + } + return result; + } + + private <T> List<T> writePojosWithOutputFormat(Class<T> annotatedPojoClass) throws Exception { + final CassandraPojoOutputFormat<T> sink = + new CassandraPojoOutputFormat<>( + builderForWriting, + annotatedPojoClass, + () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + + final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass); + List<T> pojos = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + pojos.add(pojoConstructor.newInstance(UUID.randomUUID().toString(), i, 0)); + } + try { + sink.configure(new Configuration()); + sink.open(0, 1); + for (T pojo : pojos) { + sink.writeRecord(pojo); + } + } finally { + sink.close(); + } + return pojos; + } + + private <T> Constructor<T> getPojoConstructor(Class<T> annotatedPojoClass) + throws NoSuchMethodException { + return annotatedPojoClass.getConstructor(String.class, Integer.TYPE, Integer.TYPE); + } + + private String injectTableName(String target) { + return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID); + } + + // ------------------------------------------------------------------------ + // Tests initialization + // ------------------------------------------------------------------------ + @BeforeClass public static void startAndInitializeCassandra() { raiseCassandraRequestsTimeouts(); @@ -249,46 +385,6 @@ public class CassandraConnectorITCase CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial")); } - private static void raiseCassandraRequestsTimeouts() { - try { - final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath(); - CASSANDRA_CONTAINER.copyFileFromContainer( - "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString()); - String configuration = - new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8); - String patchedConfiguration = - configuration - .replaceAll( - "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000") - .replaceAll( - "read_request_timeout_in_ms: [0-9]+", - "read_request_timeout_in_ms: 15000") - .replaceAll( - "write_request_timeout_in_ms: [0-9]+", - "write_request_timeout_in_ms: 6000"); - CASSANDRA_CONTAINER.copyFileToContainer( - Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)), - "/etc/cassandra/cassandra.yaml"); - } catch (IOException e) { - throw new RuntimeException("Unable to open Cassandra configuration file ", e); - } - } - - @Test - public void testRaiseCassandraRequestsTimeouts() throws IOException { - // raiseCassandraRequestsTimeouts() was already called in @BeforeClass, - // do not change the container conf twice, just assert that it was indeed changed in the - // container - final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath(); - CASSANDRA_CONTAINER.copyFileFromContainer( - "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString()); - final String configuration = - new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8); - assertTrue(configuration.contains("request_timeout_in_ms: 30000")); - assertTrue(configuration.contains("read_request_timeout_in_ms: 15000")); - assertTrue(configuration.contains("write_request_timeout_in_ms: 6000")); - } - @Before public void createTable() { tableID = random.nextInt(Integer.MAX_VALUE); @@ -299,12 +395,8 @@ public class CassandraConnectorITCase public void dropTables() { // need to drop tables in case of retrials. Need to drop all the tables // that are created in test because this method is executed with every test - session.execute( - DROP_TABLE_QUERY.replace( - TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); - session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); - session.execute( - DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); + session.execute(DROP_KEYSPACE_QUERY); + session.execute(CREATE_KEYSPACE_QUERY); } @AfterClass @@ -319,6 +411,34 @@ public class CassandraConnectorITCase } // ------------------------------------------------------------------------ + // Technical Tests + // ------------------------------------------------------------------------ + + @Test + public void testAnnotatePojoWithTable() { + final String tableName = TABLE_NAME_PREFIX + tableID; + + final Class<? extends Pojo> annotatedPojoClass = annotatePojoWithTable(KEYSPACE, tableName); + final Table pojoTableAnnotation = annotatedPojoClass.getAnnotation(Table.class); + assertTrue(pojoTableAnnotation.name().contains(tableName)); + } + + @Test + public void testRaiseCassandraRequestsTimeouts() throws IOException { + // raiseCassandraRequestsTimeouts() was already called in @BeforeClass, + // do not change the container conf twice, just assert that it was indeed changed in the + // container + final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath(); + CASSANDRA_CONTAINER.copyFileFromContainer( + "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString()); + final String configuration = + new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8); + assertTrue(configuration.contains("request_timeout_in_ms: 30000")); + assertTrue(configuration.contains("read_request_timeout_in_ms: 15000")); + assertTrue(configuration.contains("write_request_timeout_in_ms: 6000")); + } + + // ------------------------------------------------------------------------ // Exactly-once Tests // ------------------------------------------------------------------------ @@ -520,43 +640,36 @@ public class CassandraConnectorITCase @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); + final Class<? extends Pojo> annotatedPojoClass = + annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID); + writePojos(annotatedPojoClass, null); - CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builderForWriting); - try { - sink.open(new Configuration()); - for (int x = 0; x < 20; x++) { - sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); - } - } finally { - sink.close(); - } - - ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { - session.execute( - CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); + final Class<? extends Pojo> annotatedPojoClass = + annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID); + writePojos(annotatedPojoClass, KEYSPACE); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } - CassandraPojoSink<PojoNoAnnotatedKeyspace> sink = - new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, KEYSPACE); + private <T> void writePojos(Class<T> annotatedPojoClass, @Nullable String keyspace) + throws Exception { + final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass); + CassandraPojoSink<T> sink = + new CassandraPojoSink<>(annotatedPojoClass, builderForWriting, null, keyspace); try { sink.open(new Configuration()); for (int x = 0; x < 20; x++) { - sink.send(new PojoNoAnnotatedKeyspace(UUID.randomUUID().toString(), x, 0)); + sink.send(pojoConstructor.newInstance(UUID.randomUUID().toString(), x, 0)); } - } finally { sink.close(); } - ResultSet rs = - session.execute( - SELECT_DATA_QUERY.replace( - TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); - Assert.assertEquals(20, rs.all().size()); } @Test @@ -603,9 +716,9 @@ public class CassandraConnectorITCase @Test public void testRetrialAndDropTables() { - session.execute( - CREATE_TABLE_QUERY.replace( - TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); + // should not fail with table exists upon retrial + // as @After method that truncate the keyspace is called upon retrials. + annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID); if (retrialsCount < 2) { retrialsCount++; throw new NoHostAvailableException(new HashMap<>()); @@ -615,64 +728,16 @@ public class CassandraConnectorITCase @Test public void testCassandraBatchPojoFormat() throws Exception { - session.execute( - CREATE_TABLE_QUERY.replace( - TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); - - OutputFormat<CustomCassandraAnnotatedPojo> sink = - new CassandraPojoOutputFormat<>( - builderForWriting, - CustomCassandraAnnotatedPojo.class, - () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + final Class<? extends Pojo> annotatedPojoClass = + annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID); - List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = - IntStream.range(0, 20) - .mapToObj( - x -> - new CustomCassandraAnnotatedPojo( - UUID.randomUUID().toString(), x, 0)) - .collect(Collectors.toList()); - try { - sink.configure(new Configuration()); - sink.open(0, 1); - for (CustomCassandraAnnotatedPojo customCassandraAnnotatedPojo : - customCassandraAnnotatedPojos) { - sink.writeRecord(customCassandraAnnotatedPojo); - } - } finally { - sink.close(); - } - ResultSet rs = - session.execute( - SELECT_DATA_QUERY.replace( - TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); + final List<? extends Pojo> pojos = writePojosWithOutputFormat(annotatedPojoClass); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); - InputFormat<CustomCassandraAnnotatedPojo, InputSplit> source = - new CassandraPojoInputFormat<>( - SELECT_DATA_QUERY.replace( - TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME), - builderForReading, - CustomCassandraAnnotatedPojo.class); - List<CustomCassandraAnnotatedPojo> result = new ArrayList<>(); - - try { - source.configure(new Configuration()); - source.open(null); - while (!source.reachedEnd()) { - CustomCassandraAnnotatedPojo temp = source.nextRecord(null); - result.add(temp); - } - } finally { - source.close(); - } - + final List<? extends Pojo> result = readPojosWithInputFormat(annotatedPojoClass); Assert.assertEquals(20, result.size()); - result.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter)); - customCassandraAnnotatedPojos.sort( - Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter)); - - assertThat(result, samePropertyValuesAs(customCassandraAnnotatedPojos)); + assertThat(result, samePropertyValuesAs(pojos)); } @Test @@ -738,10 +803,6 @@ public class CassandraConnectorITCase Assert.assertEquals(rowCollection.size(), rows.size()); } - private String injectTableName(String target) { - return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID); - } - @Test public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception { Class<scala.Tuple1<String>> c = diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java index ce2c213..2efde66 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java @@ -18,12 +18,10 @@ package org.apache.flink.streaming.connectors.cassandra; import com.datastax.driver.mapping.annotations.Column; -import com.datastax.driver.mapping.annotations.Table; import java.io.Serializable; -/** Test Pojo with DataStax annotations used. */ -@Table(keyspace = "flink", name = "test") +/** Test Pojo with DataStax annotations created dynamically. */ public class Pojo implements Serializable { private static final long serialVersionUID = 1038054554690916991L; @@ -37,6 +35,9 @@ public class Pojo implements Serializable { @Column(name = "batch_id") private int batchID; + // required for deserialization + public Pojo() {} + public Pojo(String id, int counter, int batchID) { this.id = id; this.counter = counter;
