[hotfix] [cassandra connector] Fix minor issues in CassandraConnectorTest. The test now properly uses and reuses a mini cluster, rather than spawning a local environment for each test. This also properly renames the CassandraConnectorTest to CassandraConnectorITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cd9bb5f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cd9bb5f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cd9bb5f Branch: refs/heads/master Commit: 7cd9bb5f1e09ad2fdbe2b7872f92432dcfbad374 Parents: 97a83a1 Author: Stephan Ewen <[email protected]> Authored: Wed Aug 31 16:54:08 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Aug 31 18:44:29 2016 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraCommitter.java | 13 +- .../connectors/cassandra/CassandraPojoSink.java | 12 +- .../cassandra/CassandraConnectorITCase.java | 460 +++++++++++++++++++ .../cassandra/CassandraConnectorTest.java | 426 ----------------- .../operators/WriteAheadSinkTestBase.java | 8 +- 5 files changed, 479 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java index 5dceb60..e83b1be 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java @@ -26,11 +26,14 @@ import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; /** * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra * database. - * <p/> - * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + * + * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| */ public class CassandraCommitter extends CheckpointCommitter { - private ClusterBuilder builder; + + private static final long serialVersionUID = 1L; + + private final ClusterBuilder builder; private transient Cluster cluster; private transient Session session; @@ -54,9 +57,6 @@ public class CassandraCommitter extends CheckpointCommitter { /** * Internally used to set the job ID after instantiation. - * - * @param id - * @throws Exception */ public void setJobId(String id) throws Exception { super.setJobId(id); @@ -66,7 +66,6 @@ public class CassandraCommitter extends CheckpointCommitter { /** * Generates the necessary tables to store information. * - * @return * @throws Exception */ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java index 204a0f3..650c481 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java @@ -23,13 +23,19 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.flink.configuration.Configuration; /** - * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which - * it uses annotations from {@link com.datastax.driver.mapping}. + * Flink Sink to save data into a Cassandra cluster using + * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>, + * which it uses annotations from + * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html"> + * com.datastax.driver.mapping.annotations</a>. * * @param <IN> Type of the elements emitted by this sink */ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> { - protected Class<IN> clazz; + + private static final long serialVersionUID = 1L; + + protected final Class<IN> clazz; protected transient Mapper<IN> mapper; protected transient MappingManager mappingManager; http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java new file mode 100644 index 0000000..9388818 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; + +import org.apache.cassandra.service.CassandraDaemon; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; + +import org.apache.flink.test.util.TestEnvironment; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.internal.AssumptionViolatedException; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Scanner; +import java.util.UUID; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +@PowerMockIgnore("javax.management.*") +@RunWith(PowerMockRunner.class) +@PrepareForTest(ResultPartitionWriter.class) +public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); + private static File tmpDir; + + private static final boolean EMBEDDED = true; + + private static EmbeddedCassandraService cassandra; + + private static ClusterBuilder builder = new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withoutJMXReporting() + .withoutMetrics().build(); + } + }; + + private static Cluster cluster; + private static Session session; + + private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; + + private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); + + static { + for (int i = 0; i < 20; i++) { + collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + } + } + + private static class EmbeddedCassandraService { + CassandraDaemon cassandraDaemon; + + public void start() throws IOException { + this.cassandraDaemon = new CassandraDaemon(); + this.cassandraDaemon.init(null); + this.cassandraDaemon.start(); + } + + public void stop() { + this.cassandraDaemon.stop(); + } + } + + private static ForkableFlinkMiniCluster flinkCluster; + + // ------------------------------------------------------------------------ + // Cluster Setup (Cassandra & Flink) + // ------------------------------------------------------------------------ + + @BeforeClass + public static void startCassandra() throws IOException { + + // check if we should run this test, current Cassandra version requires Java >= 1.8 + try { + String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3); + float javaVersion = Float.parseFloat(javaVersionString); + Assume.assumeTrue(javaVersion >= 1.8f); + } + catch (AssumptionViolatedException e) { + System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+"); + throw e; + } + catch (Exception e) { + LOG.error("Cannot determine Java version", e); + e.printStackTrace(); + fail("Cannot determine Java version"); + } + + // generate temporary files + tmpDir = CommonTestUtils.createTempDirectory(); + ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); + File file = new File(classLoader.getResource("cassandra.yaml").getFile()); + File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); + + assertTrue(tmp.createNewFile()); + BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); + + //copy cassandra.yaml; inject absolute paths into cassandra.yaml + Scanner scanner = new Scanner(file); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + line = line.replace("$PATH", "'" + tmp.getParentFile()); + b.write(line + "\n"); + b.flush(); + } + scanner.close(); + + + // Tell cassandra where the configuration files are. + // Use the test configuration file. + System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); + + if (EMBEDDED) { + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + } + + try { + Thread.sleep(1000 * 10); + } catch (InterruptedException e) { //give cassandra a few seconds to start up + } + + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(CREATE_KEYSPACE_QUERY); + session.execute(CREATE_TABLE_QUERY); + } + + @BeforeClass + public static void startFlink() throws Exception { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + + flinkCluster = new ForkableFlinkMiniCluster(config); + flinkCluster.start(); + } + + @AfterClass + public static void stopFlink() { + flinkCluster.stop(); + } + + @AfterClass + public static void closeCassandra() { + if (session != null) { + session.executeAsync(DROP_KEYSPACE_QUERY); + session.close(); + } + + if (cluster != null) { + cluster.close(); + } + + if (cassandra != null) { + cassandra.stop(); + } + + if (tmpDir != null) { + //noinspection ResultOfMethodCallIgnored + tmpDir.delete(); + } + } + + // ------------------------------------------------------------------------ + // Test preparation & cleanup + // ------------------------------------------------------------------------ + + @Before + public void initializeExecutionEnvironment() { + TestStreamEnvironment.setAsContext(flinkCluster, 4); + new TestEnvironment(flinkCluster, 4, false).setAsContext(); + } + + @After + public void deleteSchema() throws Exception { + session.executeAsync(CLEAR_TABLE_QUERY); + } + + // ------------------------------------------------------------------------ + // Exactly-once Tests + // ------------------------------------------------------------------------ + + @Override + protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { + return new CassandraTupleWriteAheadSink<>( + INSERT_DATA_QUERY, + TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), + builder, + new CassandraCommitter(builder)); + } + + @Override + protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() { + return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class); + } + + @Override + protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) { + return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); + } + + @Override + protected void verifyResultsIdealCircumstances( + OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Override + protected void verifyResultsDataPersistenceUponMissedNotify( + OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Override + protected void verifyResultsDataDiscardingUponRestore( + OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 20; x++) { + list.add(x); + } + for (int x = 41; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Test + public void testCassandraCommitter() throws Exception { + CassandraCommitter cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); + cc1.setOperatorId("operator"); + cc1.setOperatorSubtaskId(0); + + CassandraCommitter cc2 = new CassandraCommitter(builder); + cc2.setJobId("job"); + cc2.setOperatorId("operator"); + cc2.setOperatorSubtaskId(1); + + CassandraCommitter cc3 = new CassandraCommitter(builder); + cc3.setJobId("job"); + cc3.setOperatorId("operator1"); + cc3.setOperatorSubtaskId(0); + + cc1.createResource(); + + cc1.open(); + cc2.open(); + cc3.open(); + + Assert.assertFalse(cc1.isCheckpointCommitted(1)); + Assert.assertFalse(cc2.isCheckpointCommitted(1)); + Assert.assertFalse(cc3.isCheckpointCommitted(1)); + + cc1.commitCheckpoint(1); + Assert.assertTrue(cc1.isCheckpointCommitted(1)); + //verify that other sub-tasks aren't affected + Assert.assertFalse(cc2.isCheckpointCommitted(1)); + //verify that other tasks aren't affected + Assert.assertFalse(cc3.isCheckpointCommitted(1)); + + Assert.assertFalse(cc1.isCheckpointCommitted(2)); + + cc1.close(); + cc2.close(); + cc3.close(); + + cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); + cc1.setOperatorId("operator"); + cc1.setOperatorSubtaskId(0); + + cc1.open(); + + //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data + Assert.assertTrue(cc1.isCheckpointCommitted(1)); + Assert.assertFalse(cc1.isCheckpointCommitted(2)); + + cc1.close(); + } + + // ------------------------------------------------------------------------ + // At-least-once Tests + // ------------------------------------------------------------------------ + + @Test + public void testCassandraTupleAtLeastOnceSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); + source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + + env.execute(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraPojoAtLeastOnceSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource<Pojo> source = env + .addSource(new SourceFunction<Pojo>() { + + private boolean running = true; + private volatile int cnt = 0; + + @Override + public void run(SourceContext<Pojo> ctx) throws Exception { + while (running) { + ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); + cnt++; + if (cnt == 20) { + cancel(); + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + + env.execute(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraBatchFormats() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); + dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + + env.execute("Write data"); + + DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( + new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), + TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); + + + long count = inputDS.count(); + Assert.assertEquals(count, 20L); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java deleted file mode 100644 index 2018255..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java +++ /dev/null @@ -1,426 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; - -import org.apache.cassandra.service.CassandraDaemon; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.internal.AssumptionViolatedException; -import org.junit.runner.RunWith; - -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Scanner; -import java.util.UUID; - -import static org.junit.Assert.*; - -@SuppressWarnings("serial") -@RunWith(PowerMockRunner.class) -@PrepareForTest(ResultPartitionWriter.class) -public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { - - private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class); - private static File tmpDir; - - private static final boolean EMBEDDED = true; - - private static EmbeddedCassandraService cassandra; - - private static ClusterBuilder builder = new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder - .addContactPoint("127.0.0.1") - .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) - .withoutJMXReporting() - .withoutMetrics().build(); - } - }; - - private static Cluster cluster; - private static Session session; - - private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; - - private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); - - static { - for (int i = 0; i < 20; i++) { - collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); - } - } - - private static class EmbeddedCassandraService { - CassandraDaemon cassandraDaemon; - - public void start() throws IOException { - this.cassandraDaemon = new CassandraDaemon(); - this.cassandraDaemon.init(null); - this.cassandraDaemon.start(); - } - - public void stop() { - this.cassandraDaemon.stop(); - } - } - - // ------------------------------------------------------------------------ - // Cassandra Cluster Setup - // ------------------------------------------------------------------------ - - @BeforeClass - public static void startCassandra() throws IOException { - - // check if we should run this test, current Cassandra version requires Java >= 1.8 - try { - String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3); - float javaVersion = Float.parseFloat(javaVersionString); - Assume.assumeTrue(javaVersion >= 1.8f); - } - catch (AssumptionViolatedException e) { - System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+"); - throw e; - } - catch (Exception e) { - LOG.error("Cannot determine Java version", e); - e.printStackTrace(); - fail("Cannot determine Java version"); - } - - // generate temporary files - tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader(); - File file = new File(classLoader.getResource("cassandra.yaml").getFile()); - File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - - assertTrue(tmp.createNewFile()); - BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); - - //copy cassandra.yaml; inject absolute paths into cassandra.yaml - Scanner scanner = new Scanner(file); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - line = line.replace("$PATH", "'" + tmp.getParentFile()); - b.write(line + "\n"); - b.flush(); - } - scanner.close(); - - - // Tell cassandra where the configuration files are. - // Use the test configuration file. - System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); - - if (EMBEDDED) { - cassandra = new EmbeddedCassandraService(); - cassandra.start(); - } - - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up - } - - cluster = builder.getCluster(); - session = cluster.connect(); - - session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); - } - - @Before - public void checkIfIgnore() { - - } - - @After - public void deleteSchema() throws Exception { - session.executeAsync(CLEAR_TABLE_QUERY); - } - - @AfterClass - public static void closeCassandra() { - if (session != null) { - session.executeAsync(DROP_KEYSPACE_QUERY); - session.close(); - } - - if (cluster != null) { - cluster.close(); - } - - if (cassandra != null) { - cassandra.stop(); - } - - if (tmpDir != null) { - //noinspection ResultOfMethodCallIgnored - tmpDir.delete(); - } - } - - //=====Exactly-Once================================================================================================= - @Override - protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { - return new CassandraTupleWriteAheadSink<>( - INSERT_DATA_QUERY, - TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), - builder, - new CassandraCommitter(builder)); - } - - @Override - protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() { - return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class); - } - - @Override - protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) { - return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); - } - - @Override - protected void verifyResultsIdealCircumstances( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Override - protected void verifyResultsDataPersistenceUponMissedNotify( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Override - protected void verifyResultsDataDiscardingUponRestore( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 20; x++) { - list.add(x); - } - for (int x = 41; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Test - public void testCassandraCommitter() throws Exception { - CassandraCommitter cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); - cc1.setOperatorId("operator"); - cc1.setOperatorSubtaskId(0); - - CassandraCommitter cc2 = new CassandraCommitter(builder); - cc2.setJobId("job"); - cc2.setOperatorId("operator"); - cc2.setOperatorSubtaskId(1); - - CassandraCommitter cc3 = new CassandraCommitter(builder); - cc3.setJobId("job"); - cc3.setOperatorId("operator1"); - cc3.setOperatorSubtaskId(0); - - cc1.createResource(); - - cc1.open(); - cc2.open(); - cc3.open(); - - Assert.assertFalse(cc1.isCheckpointCommitted(1)); - Assert.assertFalse(cc2.isCheckpointCommitted(1)); - Assert.assertFalse(cc3.isCheckpointCommitted(1)); - - cc1.commitCheckpoint(1); - Assert.assertTrue(cc1.isCheckpointCommitted(1)); - //verify that other sub-tasks aren't affected - Assert.assertFalse(cc2.isCheckpointCommitted(1)); - //verify that other tasks aren't affected - Assert.assertFalse(cc3.isCheckpointCommitted(1)); - - Assert.assertFalse(cc1.isCheckpointCommitted(2)); - - cc1.close(); - cc2.close(); - cc3.close(); - - cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); - cc1.setOperatorId("operator"); - cc1.setOperatorSubtaskId(0); - - cc1.open(); - - //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data - Assert.assertTrue(cc1.isCheckpointCommitted(1)); - Assert.assertFalse(cc1.isCheckpointCommitted(2)); - - cc1.close(); - } - - //=====At-Least-Once================================================================================================ - @Test - public void testCassandraTupleAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); - source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - - env.execute(); - - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); - } - - @Test - public void testCassandraPojoAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource<Pojo> source = env - .addSource(new SourceFunction<Pojo>() { - - private boolean running = true; - private volatile int cnt = 0; - - @Override - public void run(SourceContext<Pojo> ctx) throws Exception { - while (running) { - ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); - cnt++; - if (cnt == 20) { - cancel(); - } - } - } - - @Override - public void cancel() { - running = false; - } - }); - - source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); - - env.execute(); - - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); - } - - @Test - public void testCassandraBatchFormats() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); - dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - - env.execute("Write data"); - - DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( - new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), - TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); - - - long count = inputDS.count(); - Assert.assertEquals(count, 20L); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java index e3df9fc..221d7da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java @@ -25,9 +25,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.TestLogger; + import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -40,10 +41,9 @@ import java.util.ArrayList; @RunWith(PowerMockRunner.class) @PrepareForTest(ResultPartitionWriter.class) -@PowerMockIgnore("javax.management.*") -public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> { +public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger { - protected class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> { + protected static class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> { public OneInputStreamOperator<INT, INT> getOperator() { return this.headOperator; }
