Repository: flink Updated Branches: refs/heads/master 611412c6b -> 02c10d312
Revert "[FLINK-4177] Harden CassandraConnectorITCase" This reverts commit 62523acbe175cf159fe1b4ab6cf5c0412fc4d232. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02c10d31 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02c10d31 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02c10d31 Branch: refs/heads/master Commit: 02c10d312371aaad12ed8961cdf96288fe78a983 Parents: 95d640b Author: Stephan Ewen <[email protected]> Authored: Wed Nov 16 17:59:15 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Nov 16 19:08:07 2016 +0100 ---------------------------------------------------------------------- .../flink-connector-cassandra/pom.xml | 11 +- .../connectors/cassandra/CassandraSinkBase.java | 39 +- .../cassandra/CassandraConnectorITCase.java | 374 +++++++++++-------- .../connectors/cassandra/CassandraService.java | 118 ------ .../src/test/resources/cassandra.yaml | 41 +- 5 files changed, 232 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml index 07cdc09..3a1731c 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml +++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml @@ -37,8 +37,8 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <cassandra.version>2.2.7</cassandra.version> - <driver.version>3.0.3</driver.version> + <cassandra.version>2.2.5</cassandra.version> + <driver.version>3.0.0</driver.version> </properties> <build> @@ -159,13 +159,6 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> - <!-- we need this dependency for the EmbeddedCassandraService--> - <dependency> - <groupId>org.caffinitas.ohc</groupId> - <artifactId>ohc-core</artifactId> - <version>0.4.5</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 9c4c430..49b1efa 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -29,8 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; /** * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. @@ -42,13 +40,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { protected transient Cluster cluster; protected transient Session session; - protected transient final AtomicReference<Throwable> exception = new AtomicReference<>(); + protected transient Throwable exception = null; protected transient FutureCallback<V> callback; private final ClusterBuilder builder; - protected final AtomicInteger updatesPending = new AtomicInteger(); - protected CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); @@ -59,24 +55,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { this.callback = new FutureCallback<V>() { @Override public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } } @Override public void onFailure(Throwable t) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - exception.set(t); - + exception = t; LOG.error("Error while sending value.", t); } }; @@ -86,12 +69,10 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void invoke(IN value) throws Exception { - Throwable e = exception.get(); - if (e != null) { - throw new IOException("Error while sending value.", e); + if (exception != null) { + throw new IOException("invoke() failed", exception); } ListenableFuture<V> result = send(value); - updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } @@ -99,14 +80,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void close() { - while (updatesPending.get() > 0) { - synchronized (updatesPending) { - try { - updatesPending.wait(); - } catch (InterruptedException e) { - } - } - } try { if (session != null) { session.close(); @@ -121,9 +94,5 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { } catch (Exception e) { LOG.error("Error while closing cluster.", e); } - Throwable e = exception.get(); - if (e != null) { - LOG.error("Error while sending value.", e); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 258ef52..2bb6fd1 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -20,138 +20,192 @@ package org.apache.flink.streaming.connectors.cassandra; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.datastax.driver.core.SocketOptions; + +import org.apache.cassandra.service.CassandraDaemon; + import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.TestEnvironment; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; -import java.util.List; -import java.util.Random; +import java.util.Scanner; import java.util.UUID; +import static org.junit.Assert.*; + @SuppressWarnings("serial") -public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraConnectorITCase.TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { +public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); - - private static final String TABLE_NAME_PREFIX = "flink_"; - private static final String TABLE_NAME_VARIABLE = "$TABLE"; - private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';'; + private static File tmpDir; private static final boolean EMBEDDED = true; - private static final ClusterBuilder builder = new ClusterBuilder() { + private static EmbeddedCassandraService cassandra; + + private static ClusterBuilder builder = new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder .addContactPoint("127.0.0.1") - .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(30000)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) - .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 32, 32).setMaxRequestsPerConnection(HostDistance.LOCAL, 2048).setPoolTimeoutMillis(15000)) .withoutJMXReporting() .withoutMetrics().build(); } }; - private static final List<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(); - - private static CassandraService cassandra; private static Cluster cluster; private static Session session; - private static final Random random = new Random(); - private int tableID; + private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; - @BeforeClass - public static void generateCollection() { + private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); + + static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); } } + private static class EmbeddedCassandraService { + CassandraDaemon cassandraDaemon; + + public void start() throws IOException { + this.cassandraDaemon = new CassandraDaemon(); + this.cassandraDaemon.init(null); + this.cassandraDaemon.start(); + } + + public void stop() { + this.cassandraDaemon.stop(); + } + } + + private static LocalFlinkMiniCluster flinkCluster; + + // ------------------------------------------------------------------------ + // Cluster Setup (Cassandra & Flink) + // ------------------------------------------------------------------------ + @BeforeClass public static void startCassandra() throws IOException { // check if we should run this test, current Cassandra version requires Java >= 1.8 - CommonTestUtils.assumeJava8(); - - try { - cassandra = new CassandraService(); - } catch (Exception e) { - LOG.error("Failed to instantiate cassandra service.", e); - Assert.fail("Failed to instantiate cassandra service."); + org.apache.flink.core.testutils.CommonTestUtils.assumeJava8(); + + // generate temporary files + tmpDir = CommonTestUtils.createTempDirectory(); + ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); + File file = new File(classLoader.getResource("cassandra.yaml").getFile()); + File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); + + assertTrue(tmp.createNewFile()); + + try ( + BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); + + //copy cassandra.yaml; inject absolute paths into cassandra.yaml + Scanner scanner = new Scanner(file); + ) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + line = line.replace("$PATH", "'" + tmp.getParentFile()); + b.write(line + "\n"); + b.flush(); + } } + + // Tell cassandra where the configuration files are. + // Use the test configuration file. + System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); + if (EMBEDDED) { - cassandra.startProcess(); + cassandra = new EmbeddedCassandraService(); + cassandra.start(); } - long start = System.currentTimeMillis(); - long deadline = start + 1000 * 30; - while (true) { - try { - cluster = builder.getCluster(); - session = cluster.connect(); - break; - } catch (Exception e) { - if (System.currentTimeMillis() > deadline) { - throw e; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) { - } - } + try { + Thread.sleep(1000 * 10); + } catch (InterruptedException e) { //give cassandra a few seconds to start up } - LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start); + + cluster = builder.getCluster(); + session = cluster.connect(); session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial")); + session.execute(CREATE_TABLE_QUERY); } - @Before - public void createTable() { - tableID = random.nextInt(Integer.MAX_VALUE); - session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID)); + @BeforeClass + public static void startFlink() throws Exception { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + + flinkCluster = new LocalFlinkMiniCluster(config); + flinkCluster.start(); + } + + @AfterClass + public static void stopFlink() { + if (flinkCluster != null) { + flinkCluster.stop(); + flinkCluster = null; + } } @AfterClass public static void closeCassandra() { if (session != null) { + session.executeAsync(DROP_KEYSPACE_QUERY); session.close(); } @@ -159,11 +213,29 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cluster.close(); } - if (EMBEDDED) { - if (cassandra != null) { - cassandra.destroy(); - } + if (cassandra != null) { + cassandra.stop(); } + + if (tmpDir != null) { + //noinspection ResultOfMethodCallIgnored + tmpDir.delete(); + } + } + + // ------------------------------------------------------------------------ + // Test preparation & cleanup + // ------------------------------------------------------------------------ + + @Before + public void initializeExecutionEnvironment() { + TestStreamEnvironment.setAsContext(flinkCluster, 4); + new TestEnvironment(flinkCluster, 4, false).setAsContext(); + } + + @After + public void deleteSchema() throws Exception { + session.executeAsync(CLEAR_TABLE_QUERY); } // ------------------------------------------------------------------------ @@ -171,9 +243,9 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri // ------------------------------------------------------------------------ @Override - protected TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { - return new TestCassandraTupleWriteAheadSink<>( - TABLE_NAME_PREFIX + tableID, + protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { + return new CassandraTupleWriteAheadSink<>( + INSERT_DATA_QUERY, TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), builder, new CassandraCommitter(builder)); @@ -192,42 +264,43 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsIdealCircumstances( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); + ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); } for (Row s : result) { - list.remove(Integer.valueOf(s.getInt("counter"))); + list.remove(new Integer(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } @Override protected void verifyResultsDataPersistenceUponMissedNotify( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); + ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); } for (Row s : result) { - list.remove(Integer.valueOf(s.getInt("counter"))); + list.remove(new Integer(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } @Override protected void verifyResultsDataDiscardingUponRestore( OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName)); + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList<Integer> list = new ArrayList<>(); for (int x = 1; x <= 20; x++) { list.add(x); @@ -237,24 +310,23 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } for (Row s : result) { - list.remove(Integer.valueOf(s.getInt("counter"))); + list.remove(new Integer(s.getInt("counter"))); } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty()); + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); } @Test public void testCassandraCommitter() throws Exception { - String jobID = new JobID().toString(); - CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); - cc1.setJobId(jobID); + CassandraCommitter cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); cc1.setOperatorId("operator"); - CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc"); - cc2.setJobId(jobID); + CassandraCommitter cc2 = new CassandraCommitter(builder); + cc2.setJobId("job"); cc2.setOperatorId("operator"); - CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc"); - cc3.setJobId(jobID); + CassandraCommitter cc3 = new CassandraCommitter(builder); + cc3.setJobId("job"); cc3.setOperatorId("operator1"); cc1.createResource(); @@ -280,17 +352,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cc2.close(); cc3.close(); - CassandraCommitter cc4 = new CassandraCommitter(builder, "flink_auxiliary_cc"); - cc4.setJobId(jobID); - cc4.setOperatorId("operator"); + cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); + cc1.setOperatorId("operator"); - cc4.open(); + cc1.open(); //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data - Assert.assertTrue(cc4.isCheckpointCommitted(0, 1)); - Assert.assertFalse(cc4.isCheckpointCommitted(0, 2)); + Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); - cc4.close(); + cc1.close(); } // ------------------------------------------------------------------------ @@ -299,94 +371,70 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { - CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); - sink.open(new Configuration()); + DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); + source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - for (Tuple3<String, Integer, Integer> value : collection) { - sink.send(value); - } - - sink.close(); - - synchronized (sink.updatesPending) { - if (sink.updatesPending.get() != 0) { - sink.updatesPending.wait(); - } - } + env.execute(); - ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID)); - try { - Assert.assertEquals(20, rs.all().size()); - } catch (Throwable e) { - LOG.error("test failed.", e); - } + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); - - CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder); - - sink.open(new Configuration()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource<Pojo> source = env + .addSource(new SourceFunction<Pojo>() { + + private boolean running = true; + private volatile int cnt = 0; + + @Override + public void run(SourceContext<Pojo> ctx) throws Exception { + while (running) { + ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); + cnt++; + if (cnt == 20) { + cancel(); + } + } + } - for (int x = 0; x < 20; x++) { - sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); - } + @Override + public void cancel() { + running = false; + } + }); - sink.close(); + source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); - synchronized (sink.updatesPending) { - while (sink.updatesPending.get() != 0) { - sink.updatesPending.wait(); - } - } + env.execute(); - ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test")); - try { - Assert.assertEquals(20, rs.all().size()); - } catch (Throwable e) { - LOG.error("test failed.", e); - } + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraBatchFormats() throws Exception { - OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); - sink.configure(new Configuration()); - sink.open(0, 1); - - for (Tuple3<String, Integer, Integer> value : collection) { - sink.writeRecord(value); - } - - sink.close(); - - InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder); - source.configure(new Configuration()); - source.open(null); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); - List<Tuple3<String, Integer, Integer>> result = new ArrayList<>(); + DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); + dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - while (!source.reachedEnd()) { - result.add(source.nextRecord(new Tuple3<String, Integer, Integer>())); - } + env.execute("Write data"); - source.close(); - try { - Assert.assertEquals(20, result.size()); - } catch (Throwable e) { - LOG.error("test failed.", e); - } - } + DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( + new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), + TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); - protected static class TestCassandraTupleWriteAheadSink<IN extends Tuple> extends CassandraTupleWriteAheadSink<IN> { - private final String tableName; - private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { - super(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, tableName), serializer, builder, committer); - this.tableName = tableName; - } + long count = inputDS.count(); + Assert.assertEquals(count, 20L); } } http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java deleted file mode 100644 index 2e649e4..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.runtime.testutils.TestJvmProcess; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Scanner; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertTrue; - -public class CassandraService extends TestJvmProcess { - private File tmpDir; - private File tmpCassandraYaml; - - public CassandraService() throws Exception { - createCassandraYaml(); - setJVMMemory(512); - } - - private void createCassandraYaml() throws IOException { - // generate temporary files - tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); - File file = new File(classLoader.getResource("cassandra.yaml").getFile()); - tmpCassandraYaml = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - - assertTrue(tmpCassandraYaml.createNewFile()); - BufferedWriter b = new BufferedWriter(new FileWriter(tmpCassandraYaml)); - - //copy cassandra.yaml; inject absolute paths into cassandra.yaml - Scanner scanner = new Scanner(file); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - line = line.replace("$PATH", "'" + tmpCassandraYaml.getParentFile()); - b.write(line + "\n"); - b.flush(); - } - scanner.close(); - } - - @Override - public String getName() { - return "CassandraService"; - } - - @Override - public String[] getJvmArgs() { - return new String[]{ - tmpCassandraYaml.toURI().toString(), - // these options were taken directly from the jvm.options file in the cassandra repo - "-XX:+UseThreadPriorities", - "-Xss256k", - "-XX:+AlwaysPreTouch", - "-XX:+UseTLAB", - "-XX:+ResizeTLAB", - "-XX:+UseNUMA", - "-XX:+PerfDisableSharedMem", - "-XX:+UseParNewGC", - "-XX:+UseConcMarkSweepGC", - "-XX:+CMSParallelRemarkEnabled", - "-XX:SurvivorRatio=8", - "-XX:MaxTenuringThreshold=1", - "-XX:CMSInitiatingOccupancyFraction=75", - "-XX:+UseCMSInitiatingOccupancyOnly", - "-XX:CMSWaitDuration=10000", - "-XX:+CMSParallelInitialMarkEnabled", - "-XX:+CMSEdenChunksRecordAlways", - "-XX:+CMSClassUnloadingEnabled",}; - } - - @Override - public String getEntryPointClassName() { - return CassandraServiceEntryPoint.class.getName(); - } - - public static class CassandraServiceEntryPoint { - public static void main(String[] args) throws InterruptedException { - final CassandraDaemon cassandraDaemon = new CassandraDaemon(); - - System.setProperty("cassandra.config", args[0]); - - cassandraDaemon.activate(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - cassandraDaemon.deactivate(); - } - }); - - // Run forever - new CountDownLatch(1).await(); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml index 77ee0ac..0594ea3 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml @@ -15,40 +15,29 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - -auto_snapshot: false - cluster_name: 'Test Cluster' +commitlog_sync: 'periodic' +commitlog_sync_period_in_ms: 10000 +commitlog_segment_size_in_mb: 16 +partitioner: 'org.apache.cassandra.dht.RandomPartitioner' +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' commitlog_directory: $PATH/commit' -commitlog_sync: periodic -commitlog_sync_period_in_ms: 5000 - data_file_directories: - $PATH/data' -disk_access_mode: mmap - -endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' - -listen_address: '127.0.0.1' - -memtable_allocation_type: offheap_objects - -native_transport_port: 9042 - -partitioner: org.apache.cassandra.dht.Murmur3Partitioner - -read_request_timeout_in_ms: 15000 -request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler -request_scheduler_id: keyspace -rpc_port: 9170 - saved_caches_directory: $PATH/cache' +listen_address: '127.0.0.1' seed_provider: - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' parameters: - seeds: '127.0.0.1' +native_transport_port: 9042 + +concurrent_reads: 8 +concurrent_writes: 8 + +auto_bootstrap: false +auto_snapshot: false + start_rpc: false start_native_transport: true -storage_port: 7010 - -write_request_timeout_in_ms: 15000 +native_transport_max_threads: 8
