[FLINK-5101] Refactor CassandraConnectorITCase

This closes #2866.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0be04b45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0be04b45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0be04b45

Branch: refs/heads/master
Commit: 0be04b454f58d5575bd6fab5755aa1264f363b91
Parents: 948bb9f
Author: zentol <ches...@apache.org>
Authored: Wed May 10 12:08:11 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     | 213 ++++++++-----------
 1 file changed, 89 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0be04b45/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f2e8f8b..06f3c35 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -28,39 +28,24 @@ import com.datastax.driver.core.Session;
 import org.apache.cassandra.service.CassandraDaemon;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.TestEnvironment;
 
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +56,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Random;
 import java.util.Scanner;
 import java.util.UUID;
 
@@ -100,12 +87,15 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        private static Cluster cluster;
        private static Session session;
 
+       private static final String TABLE_NAME_PREFIX = "flink_";
+       private static final String TABLE_NAME_VARIABLE = "$TABLE";
        private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE 
flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-       private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE 
flink;";
-       private static final String CREATE_TABLE_QUERY = "CREATE TABLE 
flink.test (id text PRIMARY KEY, counter int, batch_id int);";
-       private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
-       private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test 
(id, counter, batch_id) VALUES (?, ?, ?)";
-       private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
+       private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink." 
+ TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);";
+       private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + 
TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)";
+       private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." 
+ TABLE_NAME_VARIABLE + ';';
+
+       private static final Random random = new Random();
+       private int tableID;
 
        private static final ArrayList<Tuple3<String, Integer, Integer>> 
collection = new ArrayList<>(20);
 
@@ -129,12 +119,6 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                }
        }
 
-       private static LocalFlinkMiniCluster flinkCluster;
-
-       // 
------------------------------------------------------------------------
-       //  Cluster Setup (Cassandra & Flink)
-       // 
------------------------------------------------------------------------
-
        @BeforeClass
        public static void startCassandra() throws IOException {
 
@@ -173,39 +157,39 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                        cassandra.start();
                }
 
-               try {
-                       Thread.sleep(1000 * 10);
-               } catch (InterruptedException e) { //give cassandra a few 
seconds to start up
+               // start establishing a connection within 30 seconds
+               long start = System.nanoTime();
+               long deadline = start + 30_000_000_000L;
+               while (true) {
+                       try {
+                               cluster = builder.getCluster();
+                               session = cluster.connect();
+                               break;
+                       } catch (Exception e) {
+                               if (System.nanoTime() > deadline) {
+                                       throw e;
+                               }
+                               try {
+                                       Thread.sleep(500);
+                               } catch (InterruptedException ignored) {
+                               }
+                       }
                }
-
-               cluster = builder.getCluster();
-               session = cluster.connect();
+               LOG.debug("Connection established after {}ms.", 
System.currentTimeMillis() - start);
 
                session.execute(CREATE_KEYSPACE_QUERY);
-               session.execute(CREATE_TABLE_QUERY);
-       }
-
-       @BeforeClass
-       public static void startFlink() throws Exception {
-               Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
-
-               flinkCluster = new LocalFlinkMiniCluster(config);
-               flinkCluster.start();
+               session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + "initial"));
        }
 
-       @AfterClass
-       public static void stopFlink() {
-               if (flinkCluster != null) {
-                       flinkCluster.stop();
-                       flinkCluster = null;
-               }
+       @Before
+       public void createTable() {
+               tableID = random.nextInt(Integer.MAX_VALUE);
+               session.execute(injectTableName(CREATE_TABLE_QUERY));
        }
 
        @AfterClass
        public static void closeCassandra() {
                if (session != null) {
-                       session.executeAsync(DROP_KEYSPACE_QUERY);
                        session.close();
                }
 
@@ -224,28 +208,13 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        // 
------------------------------------------------------------------------
-       //  Test preparation & cleanup
-       // 
------------------------------------------------------------------------
-
-       @Before
-       public void initializeExecutionEnvironment() {
-               TestStreamEnvironment.setAsContext(flinkCluster, 4);
-               new TestEnvironment(flinkCluster, 4, false).setAsContext();
-       }
-
-       @After
-       public void deleteSchema() throws Exception {
-               session.executeAsync(CLEAR_TABLE_QUERY);
-       }
-
-       // 
------------------------------------------------------------------------
        //  Exactly-once Tests
        // 
------------------------------------------------------------------------
 
        @Override
        protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> createSink() throws Exception {
                return new CassandraTupleWriteAheadSink<>(
-                       INSERT_DATA_QUERY,
+                       injectTableName(INSERT_DATA_QUERY),
                        TypeExtractor.getForObject(new Tuple3<>("", 0, 
0)).createSerializer(new ExecutionConfig()),
                        builder,
                        new CassandraCommitter(builder));
@@ -264,7 +233,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        @Override
        protected void 
verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, 
Integer, Integer>> sink) {
 
-               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ResultSet result = 
session.execute(injectTableName(SELECT_DATA_QUERY));
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
                        list.add(x);
@@ -279,7 +248,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        @Override
        protected void 
verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String,
 Integer, Integer>> sink) {
 
-               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ResultSet result = 
session.execute(injectTableName(SELECT_DATA_QUERY));
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
                        list.add(x);
@@ -294,7 +263,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        @Override
        protected void 
verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String,
 Integer, Integer>> sink) {
 
-               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ResultSet result = 
session.execute(injectTableName(SELECT_DATA_QUERY));
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 20; x++) {
                        list.add(x);
@@ -324,7 +293,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                }
 
                ArrayList<Integer> actual = new ArrayList<>();
-               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ResultSet result = 
session.execute(injectTableName(SELECT_DATA_QUERY));
                for (Row s : result) {
                        actual.add(s.getInt("counter"));
                }
@@ -335,16 +304,17 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Test
        public void testCassandraCommitter() throws Exception {
-               CassandraCommitter cc1 = new CassandraCommitter(builder);
-               cc1.setJobId("job");
+               String jobID = new JobID().toString();
+               CassandraCommitter cc1 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
+               cc1.setJobId(jobID);
                cc1.setOperatorId("operator");
 
-               CassandraCommitter cc2 = new CassandraCommitter(builder);
-               cc2.setJobId("job");
+               CassandraCommitter cc2 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
+               cc2.setJobId(jobID);
                cc2.setOperatorId("operator");
 
-               CassandraCommitter cc3 = new CassandraCommitter(builder);
-               cc3.setJobId("job");
+               CassandraCommitter cc3 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
+               cc3.setJobId(jobID);
                cc3.setOperatorId("operator1");
 
                cc1.createResource();
@@ -370,8 +340,8 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                cc2.close();
                cc3.close();
 
-               cc1 = new CassandraCommitter(builder);
-               cc1.setJobId("job");
+               cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+               cc1.setJobId(jobID);
                cc1.setOperatorId("operator");
 
                cc1.open();
@@ -389,70 +359,65 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Test
        public void testCassandraTupleAtLeastOnceSink() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(1);
+               CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new 
CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
 
-               DataStream<Tuple3<String, Integer, Integer>> source = 
env.fromCollection(collection);
-               source.addSink(new CassandraTupleSink<Tuple3<String, Integer, 
Integer>>(INSERT_DATA_QUERY, builder));
+               sink.open(new Configuration());
+
+               for (Tuple3<String, Integer, Integer> value : collection) {
+                       sink.send(value);
+               }
 
-               env.execute();
+               sink.close();
 
-               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
                Assert.assertEquals(20, rs.all().size());
        }
 
        @Test
        public void testCassandraPojoAtLeastOnceSink() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(1);
-
-               DataStreamSource<Pojo> source = env
-                       .addSource(new SourceFunction<Pojo>() {
-
-                               private boolean running = true;
-                               private volatile int cnt = 0;
-
-                               @Override
-                               public void run(SourceContext<Pojo> ctx) throws 
Exception {
-                                       while (running) {
-                                               ctx.collect(new 
Pojo(UUID.randomUUID().toString(), cnt, 0));
-                                               cnt++;
-                                               if (cnt == 20) {
-                                                       cancel();
-                                               }
-                                       }
-                               }
+               session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test"));
 
-                               @Override
-                               public void cancel() {
-                                       running = false;
-                               }
-                       });
+               CassandraPojoSink<Pojo> sink = new 
CassandraPojoSink<>(Pojo.class, builder);
 
-               source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+               sink.open(new Configuration());
 
-               env.execute();
+               for (int x = 0; x < 20; x++) {
+                       sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
+               }
 
-               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               sink.close();
+
+               ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
                Assert.assertEquals(20, rs.all().size());
        }
 
        @Test
        public void testCassandraBatchFormats() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(1);
+               OutputFormat<Tuple3<String, Integer, Integer>> sink = new 
CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+               sink.configure(new Configuration());
+               sink.open(0, 1);
 
-               DataSet<Tuple3<String, Integer, Integer>> dataSet = 
env.fromCollection(collection);
-               dataSet.output(new CassandraOutputFormat<Tuple3<String, 
Integer, Integer>>(INSERT_DATA_QUERY, builder));
+               for (Tuple3<String, Integer, Integer> value : collection) {
+                       sink.writeRecord(value);
+               }
 
-               env.execute("Write data");
+               sink.close();
 
-               DataSet<Tuple3<String, Integer, Integer>> inputDS = 
env.createInput(
-                       new CassandraInputFormat<Tuple3<String, Integer, 
Integer>>(SELECT_DATA_QUERY, builder),
-                       TypeInformation.of(new TypeHint<Tuple3<String, Integer, 
Integer>>(){}));
+               InputFormat<Tuple3<String, Integer, Integer>, InputSplit> 
source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), 
builder);
+               source.configure(new Configuration());
+               source.open(null);
 
+               List<Tuple3<String, Integer, Integer>> result = new 
ArrayList<>();
+
+               while (!source.reachedEnd()) {
+                       result.add(source.nextRecord(new Tuple3<String, 
Integer, Integer>()));
+               }
+
+               source.close();
+               Assert.assertEquals(20, result.size());
+       }
 
-               long count = inputDS.count();
-               Assert.assertEquals(count, 20L);
+       private String injectTableName(String target) {
+               return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + 
tableID);
        }
 }

Reply via email to