Repository: flink Updated Branches: refs/heads/master 381bf5912 -> 62523acbe
[FLINK-4177] Harden CassandraConnectorITCase This closes #2484. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62523acb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62523acb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62523acb Branch: refs/heads/master Commit: 62523acbe175cf159fe1b4ab6cf5c0412fc4d232 Parents: 381bf59 Author: zentol <[email protected]> Authored: Mon Nov 14 14:02:44 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Nov 14 14:38:38 2016 +0100 ---------------------------------------------------------------------- .../flink-connector-cassandra/pom.xml | 11 +- .../connectors/cassandra/CassandraSinkBase.java | 39 +- .../cassandra/CassandraConnectorITCase.java | 374 ++++++++----------- .../connectors/cassandra/CassandraService.java | 118 ++++++ .../src/test/resources/cassandra.yaml | 41 +- 5 files changed, 351 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62523acb/flink-streaming-connectors/flink-connector-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml index 3a1731c..07cdc09 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml +++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml @@ -37,8 +37,8 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <cassandra.version>2.2.5</cassandra.version> - <driver.version>3.0.0</driver.version> + <cassandra.version>2.2.7</cassandra.version> + <driver.version>3.0.3</driver.version> </properties> <build> @@ -159,6 +159,13 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + <!-- we need this dependency for the EmbeddedCassandraService--> + <dependency> + <groupId>org.caffinitas.ohc</groupId> + <artifactId>ohc-core</artifactId> + <version>0.4.5</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/62523acb/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 49b1efa..9c4c430 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -29,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. @@ -40,11 +42,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { protected transient Cluster cluster; protected transient Session session; - protected transient Throwable exception = null; + protected transient final AtomicReference<Throwable> exception = new AtomicReference<>(); protected transient FutureCallback<V> callback; private final ClusterBuilder builder; + protected final AtomicInteger updatesPending = new AtomicInteger(); + protected CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); @@ -55,11 +59,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { this.callback = new FutureCallback<V>() { @Override public void onSuccess(V ignored) { + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } } @Override public void onFailure(Throwable t) { - exception = t; + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } + exception.set(t); + LOG.error("Error while sending value.", t); } }; @@ -69,10 +86,12 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void invoke(IN value) throws Exception { - if (exception != null) { - throw new IOException("invoke() failed", exception); + Throwable e = exception.get(); + if (e != null) { + throw new IOException("Error while sending value.", e); } ListenableFuture<V> result = send(value); + updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } @@ -80,6 +99,14 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void close() { + while (updatesPending.get() > 0) { + synchronized (updatesPending) { + try { + updatesPending.wait(); + } catch (InterruptedException e) { + } + } + } try { if (session != null) { session.close(); @@ -94,5 +121,9 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { } catch (Exception e) { LOG.error("Error while closing cluster.", e); } + Throwable e = exception.get(); + if (e != null) { + LOG.error("Error while sending value.", e); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/62523acb/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 2bb6fd1..258ef52 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -20,192 +20,138 @@ package org.apache.flink.streaming.connectors.cassandra; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; - -import org.apache.cassandra.service.CassandraDaemon; - +import com.datastax.driver.core.SocketOptions; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.TestEnvironment; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; - -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; -import java.util.Scanner; +import java.util.List; +import java.util.Random; import java.util.UUID; -import static org.junit.Assert.*; - @SuppressWarnings("serial") -public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { +public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraConnectorITCase.TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); - private static File tmpDir; - private static final boolean EMBEDDED = true; + private static final String TABLE_NAME_PREFIX = "flink_"; + private static final String TABLE_NAME_VARIABLE = "$TABLE"; + private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';'; - private static EmbeddedCassandraService cassandra; + private static final boolean EMBEDDED = true; - private static ClusterBuilder builder = new ClusterBuilder() { + private static final ClusterBuilder builder = new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder .addContactPoint("127.0.0.1") + .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(30000)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 32, 32).setMaxRequestsPerConnection(HostDistance.LOCAL, 2048).setPoolTimeoutMillis(15000)) .withoutJMXReporting() .withoutMetrics().build(); } }; + private static final List<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(); + + private static CassandraService cassandra; private static Cluster cluster; private static Session session; - private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; + private static final Random random = new Random(); + private int tableID; - private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); - - static { + @BeforeClass + public static void generateCollection() { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); } } - private static class EmbeddedCassandraService { - CassandraDaemon cassandraDaemon; - - public void start() throws IOException { - this.cassandraDaemon = new CassandraDaemon(); - this.cassandraDaemon.init(null); - this.cassandraDaemon.start(); - } - - public void stop() { - this.cassandraDaemon.stop(); - } - } - - private static LocalFlinkMiniCluster flinkCluster; - - // ------------------------------------------------------------------------ - // Cluster Setup (Cassandra & Flink) - // ------------------------------------------------------------------------ - @BeforeClass public static void startCassandra() throws IOException { // check if we should run this test, current Cassandra version requires Java >= 1.8 - org.apache.flink.core.testutils.CommonTestUtils.assumeJava8(); - - // generate temporary files - tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); - File file = new File(classLoader.getResource("cassandra.yaml").getFile()); - File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - - assertTrue(tmp.createNewFile()); - - try ( - BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); - - //copy cassandra.yaml; inject absolute paths into cassandra.yaml - Scanner scanner = new Scanner(file); - ) { - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - line = line.replace("$PATH", "'" + tmp.getParentFile()); - b.write(line + "\n"); - b.flush(); - } - } - + CommonTestUtils.assumeJava8(); - // Tell cassandra where the configuration files are. - // Use the test configuration file. - System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); + try { + cassandra = new CassandraService(); + } catch (Exception e) { + LOG.error("Failed to instantiate cassandra service.", e); + Assert.fail("Failed to instantiate cassandra service."); + } if (EMBEDDED) { - cassandra = new EmbeddedCassandraService(); - cassandra.start(); + cassandra.startProcess(); } - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + long start = System.currentTimeMillis(); + long deadline = start + 1000 * 30; + while (true) { + try { + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (System.currentTimeMillis() > deadline) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } } - - cluster = builder.getCluster(); - session = cluster.connect(); + LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start); session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial")); } - @BeforeClass - public static void startFlink() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - - flinkCluster = new LocalFlinkMiniCluster(config); - flinkCluster.start(); - } - - @AfterClass - public static void stopFlink() { - if (flinkCluster != null) { - flinkCluster.stop(); - flinkCluster = null; - } + @Before + public void createTable() { + tableID = random.nextInt(Integer.MAX_VALUE); + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID)); } @AfterClass public static void closeCassandra() { if (session != null) { - session.executeAsync(DROP_KEYSPACE_QUERY); session.close(); } @@ -213,39 +159,21 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cluster.close(); } - if (cassandra != null) { - cassandra.stop(); - } - - if (tmpDir != null) { - //noinspection ResultOfMethodCallIgnored - tmpDir.delete(); + if (EMBEDDED) { + if (cassandra != null) { + cassandra.destroy(); + } } } // ------------------------------------------------------------------------ - // Test preparation & cleanup - // ------------------------------------------------------------------------ - - @Before - public void initializeExecutionEnvironment() { - TestStreamEnvironment.setAsContext(flinkCluster, 4); - new TestEnvironment(flinkCluster, 4, false).setAsContext(); - } - - @After - public void deleteSchema() throws Exception { - session.executeAsync(CLEAR_TABLE_QUERY); - } - - // ------------------------------------------------------------------------ // Exactly-once Tests // ------------------------------------------------------------------------ @Override - protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { - return new CassandraTupleWriteAheadSink<>( - INSERT_DATA_QUERY, + protected TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { + return new TestCassandraTupleWriteAheadSink<>( + TABLE_NAME_PREFIX + tableID, TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), builder, new CassandraCommitter(builder)); @@ -264,43 +192,42 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsIdealCircumstances( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); } for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(Integer.valueOf(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); } @Override protected void verifyResultsDataPersistenceUponMissedNotify( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); } for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(Integer.valueOf(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); } @Override protected void verifyResultsDataDiscardingUponRestore( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); + TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 20; x++) { list.add(x); @@ -310,23 +237,24 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(Integer.valueOf(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); } @Test public void testCassandraCommitter() throws Exception { - CassandraCommitter cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); + String jobID = new JobID().toString(); + CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc1.setJobId(jobID); cc1.setOperatorId("operator"); - CassandraCommitter cc2 = new CassandraCommitter(builder); - cc2.setJobId("job"); + CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc2.setJobId(jobID); cc2.setOperatorId("operator"); - CassandraCommitter cc3 = new CassandraCommitter(builder); - cc3.setJobId("job"); + CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc3.setJobId(jobID); cc3.setOperatorId("operator1"); cc1.createResource(); @@ -352,17 +280,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cc2.close(); cc3.close(); - cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); - cc1.setOperatorId("operator"); + CassandraCommitter cc4 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc4.setJobId(jobID); + cc4.setOperatorId("operator"); - cc1.open(); + cc4.open(); //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data - Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); - Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); + Assert.assertTrue(cc4.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc4.isCheckpointCommitted(0, 2)); - cc1.close(); + cc4.close(); } // ------------------------------------------------------------------------ @@ -371,70 +299,94 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); - DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); - source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + sink.open(new Configuration()); - env.execute(); + for (Tuple3<String, Integer, Integer> value : collection) { + sink.send(value); + } + + sink.close(); + + synchronized (sink.updatesPending) { + if (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + } - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID)); + try { + Assert.assertEquals(20, rs.all().size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } } @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource<Pojo> source = env - .addSource(new SourceFunction<Pojo>() { - - private boolean running = true; - private volatile int cnt = 0; - - @Override - public void run(SourceContext<Pojo> ctx) throws Exception { - while (running) { - ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); - cnt++; - if (cnt == 20) { - cancel(); - } - } - } + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); - @Override - public void cancel() { - running = false; - } - }); + CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder); + + sink.open(new Configuration()); - source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + for (int x = 0; x < 20; x++) { + sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); + } - env.execute(); + sink.close(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); + synchronized (sink.updatesPending) { + while (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + } + + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test")); + try { + Assert.assertEquals(20, rs.all().size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } } @Test public void testCassandraBatchFormats() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3<String, Integer, Integer> value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); + source.configure(new Configuration()); + source.open(null); - DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); - dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + List<Tuple3<String, Integer, Integer>> result = new ArrayList<>(); - env.execute("Write data"); + while (!source.reachedEnd()) { + result.add(source.nextRecord(new Tuple3<String, Integer, Integer>())); + } - DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( - new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), - TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); + source.close(); + try { + Assert.assertEquals(20, result.size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } + } + protected static class TestCassandraTupleWriteAheadSink<IN extends Tuple> extends CassandraTupleWriteAheadSink<IN> { + private final String tableName; - long count = inputDS.count(); - Assert.assertEquals(count, 20L); + private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { + super(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, tableName), serializer, builder, committer); + this.tableName = tableName; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/62523acb/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java new file mode 100644 index 0000000..2e649e4 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.TestJvmProcess; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Scanner; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertTrue; + +public class CassandraService extends TestJvmProcess { + private File tmpDir; + private File tmpCassandraYaml; + + public CassandraService() throws Exception { + createCassandraYaml(); + setJVMMemory(512); + } + + private void createCassandraYaml() throws IOException { + // generate temporary files + tmpDir = CommonTestUtils.createTempDirectory(); + ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); + File file = new File(classLoader.getResource("cassandra.yaml").getFile()); + tmpCassandraYaml = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); + + assertTrue(tmpCassandraYaml.createNewFile()); + BufferedWriter b = new BufferedWriter(new FileWriter(tmpCassandraYaml)); + + //copy cassandra.yaml; inject absolute paths into cassandra.yaml + Scanner scanner = new Scanner(file); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + line = line.replace("$PATH", "'" + tmpCassandraYaml.getParentFile()); + b.write(line + "\n"); + b.flush(); + } + scanner.close(); + } + + @Override + public String getName() { + return "CassandraService"; + } + + @Override + public String[] getJvmArgs() { + return new String[]{ + tmpCassandraYaml.toURI().toString(), + // these options were taken directly from the jvm.options file in the cassandra repo + "-XX:+UseThreadPriorities", + "-Xss256k", + "-XX:+AlwaysPreTouch", + "-XX:+UseTLAB", + "-XX:+ResizeTLAB", + "-XX:+UseNUMA", + "-XX:+PerfDisableSharedMem", + "-XX:+UseParNewGC", + "-XX:+UseConcMarkSweepGC", + "-XX:+CMSParallelRemarkEnabled", + "-XX:SurvivorRatio=8", + "-XX:MaxTenuringThreshold=1", + "-XX:CMSInitiatingOccupancyFraction=75", + "-XX:+UseCMSInitiatingOccupancyOnly", + "-XX:CMSWaitDuration=10000", + "-XX:+CMSParallelInitialMarkEnabled", + "-XX:+CMSEdenChunksRecordAlways", + "-XX:+CMSClassUnloadingEnabled",}; + } + + @Override + public String getEntryPointClassName() { + return CassandraServiceEntryPoint.class.getName(); + } + + public static class CassandraServiceEntryPoint { + public static void main(String[] args) throws InterruptedException { + final CassandraDaemon cassandraDaemon = new CassandraDaemon(); + + System.setProperty("cassandra.config", args[0]); + + cassandraDaemon.activate(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + cassandraDaemon.deactivate(); + } + }); + + // Run forever + new CountDownLatch(1).await(); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/62523acb/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml index 0594ea3..77ee0ac 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml @@ -15,29 +15,40 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ + +auto_snapshot: false + cluster_name: 'Test Cluster' -commitlog_sync: 'periodic' -commitlog_sync_period_in_ms: 10000 -commitlog_segment_size_in_mb: 16 -partitioner: 'org.apache.cassandra.dht.RandomPartitioner' -endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' commitlog_directory: $PATH/commit' +commitlog_sync: periodic +commitlog_sync_period_in_ms: 5000 + data_file_directories: - $PATH/data' -saved_caches_directory: $PATH/cache' +disk_access_mode: mmap + +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' + listen_address: '127.0.0.1' -seed_provider: - - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' - parameters: - - seeds: '127.0.0.1' + +memtable_allocation_type: offheap_objects + native_transport_port: 9042 -concurrent_reads: 8 -concurrent_writes: 8 +partitioner: org.apache.cassandra.dht.Murmur3Partitioner -auto_bootstrap: false -auto_snapshot: false +read_request_timeout_in_ms: 15000 +request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler +request_scheduler_id: keyspace +rpc_port: 9170 +saved_caches_directory: $PATH/cache' +seed_provider: + - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' + parameters: + - seeds: '127.0.0.1' start_rpc: false start_native_transport: true -native_transport_max_threads: 8 +storage_port: 7010 + +write_request_timeout_in_ms: 15000
