[FLINK-5101] Refactor CassandraConnectorITCase This closes #2866.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0be04b45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0be04b45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0be04b45 Branch: refs/heads/master Commit: 0be04b454f58d5575bd6fab5755aa1264f363b91 Parents: 948bb9f Author: zentol <ches...@apache.org> Authored: Wed May 10 12:08:11 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Fri May 12 20:20:53 2017 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraConnectorITCase.java | 213 ++++++++----------- 1 file changed, 89 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0be04b45/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f2e8f8b..06f3c35 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -28,39 +28,24 @@ import com.datastax.driver.core.Session; import org.apache.cassandra.service.CassandraDaemon; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.TestEnvironment; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; - -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +56,8 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.Random; import java.util.Scanner; import java.util.UUID; @@ -100,12 +87,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri private static Cluster cluster; private static Session session; + private static final String TABLE_NAME_PREFIX = "flink_"; + private static final String TABLE_NAME_VARIABLE = "$TABLE"; private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';'; + + private static final Random random = new Random(); + private int tableID; private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); @@ -129,12 +119,6 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } } - private static LocalFlinkMiniCluster flinkCluster; - - // ------------------------------------------------------------------------ - // Cluster Setup (Cassandra & Flink) - // ------------------------------------------------------------------------ - @BeforeClass public static void startCassandra() throws IOException { @@ -173,39 +157,39 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cassandra.start(); } - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + // start establishing a connection within 30 seconds + long start = System.nanoTime(); + long deadline = start + 30_000_000_000L; + while (true) { + try { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (System.nanoTime() > deadline) { + throw e; + } + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + } } - - cluster = builder.getCluster(); - session = cluster.connect(); + LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start); session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); - } - - @BeforeClass - public static void startFlink() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - - flinkCluster = new LocalFlinkMiniCluster(config); - flinkCluster.start(); + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial")); } - @AfterClass - public static void stopFlink() { - if (flinkCluster != null) { - flinkCluster.stop(); - flinkCluster = null; - } + @Before + public void createTable() { + tableID = random.nextInt(Integer.MAX_VALUE); + session.execute(injectTableName(CREATE_TABLE_QUERY)); } @AfterClass public static void closeCassandra() { if (session != null) { - session.executeAsync(DROP_KEYSPACE_QUERY); session.close(); } @@ -224,28 +208,13 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } // ------------------------------------------------------------------------ - // Test preparation & cleanup - // ------------------------------------------------------------------------ - - @Before - public void initializeExecutionEnvironment() { - TestStreamEnvironment.setAsContext(flinkCluster, 4); - new TestEnvironment(flinkCluster, 4, false).setAsContext(); - } - - @After - public void deleteSchema() throws Exception { - session.executeAsync(CLEAR_TABLE_QUERY); - } - - // ------------------------------------------------------------------------ // Exactly-once Tests // ------------------------------------------------------------------------ @Override protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { return new CassandraTupleWriteAheadSink<>( - INSERT_DATA_QUERY, + injectTableName(INSERT_DATA_QUERY), TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), builder, new CassandraCommitter(builder)); @@ -264,7 +233,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); @@ -279,7 +248,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); @@ -294,7 +263,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 20; x++) { list.add(x); @@ -324,7 +293,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } ArrayList<Integer> actual = new ArrayList<>(); - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); for (Row s : result) { actual.add(s.getInt("counter")); } @@ -335,16 +304,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Test public void testCassandraCommitter() throws Exception { - CassandraCommitter cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); + String jobID = new JobID().toString(); + CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc1.setJobId(jobID); cc1.setOperatorId("operator"); - CassandraCommitter cc2 = new CassandraCommitter(builder); - cc2.setJobId("job"); + CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc2.setJobId(jobID); cc2.setOperatorId("operator"); - CassandraCommitter cc3 = new CassandraCommitter(builder); - cc3.setJobId("job"); + CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc3.setJobId(jobID); cc3.setOperatorId("operator1"); cc1.createResource(); @@ -370,8 +340,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cc2.close(); cc3.close(); - cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); + cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc1.setJobId(jobID); cc1.setOperatorId("operator"); cc1.open(); @@ -389,70 +359,65 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder); - DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); - source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + sink.open(new Configuration()); + + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + } - env.execute(); + sink.close(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource<Pojo> source = env - .addSource(new SourceFunction<Pojo>() { - - private boolean running = true; - private volatile int cnt = 0; - - @Override - public void run(SourceContext<Pojo> ctx) throws Exception { - while (running) { - ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); - cnt++; - if (cnt == 20) { - cancel(); - } - } - } + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); - @Override - public void cancel() { - running = false; - } - }); + CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder); - source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + sink.open(new Configuration()); - env.execute(); + for (int x = 0; x < 20; x++) { + sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); + } - ResultSet rs = session.execute(SELECT_DATA_QUERY); + sink.close(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test")); Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraBatchFormats() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); - DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); - dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + for (Tuple3<String, Integer, Integer> value : collection) { + sink.writeRecord(value); + } - env.execute("Write data"); + sink.close(); - DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( - new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), - TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); + InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder); + source.configure(new Configuration()); + source.open(null); + List<Tuple3<String, Integer, Integer>> result = new ArrayList<>(); + + while (!source.reachedEnd()) { + result.add(source.nextRecord(new Tuple3<String, Integer, Integer>())); + } + + source.close(); + Assert.assertEquals(20, result.size()); + } - long count = inputDS.count(); - Assert.assertEquals(count, 20L); + private String injectTableName(String target) { + return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID); } }