[
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15151910#comment-15151910
]
ASF GitHub Bot commented on FLINK-3311:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/1660#discussion_r53282016
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/CassandraConnectorTest.java
---
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.cassandra;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Scanner;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.cassandra.batch.CassandraInputFormat;
+import org.apache.flink.connectors.cassandra.batch.CassandraOutputFormat;
+import org.apache.flink.connectors.cassandra.streaming.CassandraMapperSink;
+import org.apache.flink.connectors.cassandra.streaming.CassandraSink;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import com.datastax.driver.core.ResultSet;
+import org.apache.zookeeper.KeeperException;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Pojo.class})
+@PowerMockIgnore("javax.management.*")
+public class CassandraConnectorTest extends
StreamingMultipleProgramsTestBase {
+
+
+ private static File tmpDir;
+
+ private static EmbeddedCassandraService cassandra;
+ private static Cluster.Builder builder;
+ private static Session session;
+
+ private static final long COUNT = 20;
+
+ //
+ // QUERY
+ //
+ private static final String CREATE_KEYSPACE = "CREATE KEYSPACE test
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };";
+ private static final String DROP_KEYSPACE = "DROP KEYSPACE test;";
+
+ private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS
test.tuplesink(id bigint PRIMARY KEY, value text);";
+ private static final String INSERT_QUERY = "INSERT INTO test.tuplesink
(id,value) VALUES (?,?);";
+ private static final String SELECT_QUERY = "SELECT * FROM
test.tuplesink;";
+ private static final String DROP_TABLE = "DROP TABLE test.tuplesink;";
+
+ private static final String CREATE_TABLE_MAPPER = "CREATE TABLE IF NOT
EXISTS test.mappersink(id bigint,value text,PRIMARY KEY(id))";
+ private static final String SELECT_QUERY_MAPPER = "SELECT * FROM
test.mappersink;";
+ private static final String DROP_TABLE_MAPPER = "DROP TABLE
test.mappersink;";
+
+ private static final ArrayList<Tuple2<Long,String>> collection = new
ArrayList<>(20);
+ static {
+ for (long i = 0; i < 20; i++) {
+ collection.add(new Tuple2<>(i, "cassandra-" + i));
+ }
+ }
+
+
+ private static class EmbeddedCassandraService {
+ CassandraDaemon cassandraDaemon;
+
+ public void start() throws IOException {
+ this.cassandraDaemon = new CassandraDaemon();
+ this.cassandraDaemon.init(null);
+ this.cassandraDaemon.start();
+ }
+
+ public void stop() {
+ this.cassandraDaemon.stop();
+ }
+ }
+
+ @BeforeClass
+ public static void startCassandra() throws IOException {
+
+ //generate temporary files
+ tmpDir = CommonTestUtils.createTempDirectory();
+ ClassLoader classLoader =
CassandraConnectorTest.class.getClassLoader();
+ File file = new
File(classLoader.getResource("cassandra.yaml").getFile());
+ File tmp = new File(tmpDir.getAbsolutePath() + File.separator +
"cassandra.yaml");
+ tmp.createNewFile();
+ BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+ //copy cassandra.yaml; inject absolute paths into cassandra.yaml
+ Scanner scanner = new Scanner(file);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ line = line.replace("$PATH", "'" + tmp.getParentFile());
+ b.write(line + "\n");
+ b.flush();
+ }
+ scanner.close();
+
+
+ // Tell cassandra where the configuration files are.
+ // Use the test configuration file.
+ System.setProperty("cassandra.config", "file:" + File.separator
+ File.separator + File.separator + tmp.getAbsolutePath());
+
+
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+
+ builder = Cluster.builder().addContactPoint("127.0.0.1");
+ session = builder.build().connect();
+
+ session.execute(CREATE_KEYSPACE);
+ }
+
+ //
+ // CassandraSink.java
+ //
+
+ @Test(expected = IllegalArgumentException.class)
+ public void queryNotSet() {
+ new CassandraSink<Tuple2<Long, String>>(null) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return CassandraConnectorTest.builder;
+ }
+ };
+ }
+
+ @Test
+ public void cassandraSink() throws Exception {
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<Tuple2<Long, String>> source =
env.fromCollection(collection);
+
+ source.addSink(new CassandraSink<Tuple2<Long,
String>>(CREATE_TABLE, INSERT_QUERY) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return CassandraConnectorTest.builder;
+ }
+ });
+
+ env.execute();
+
+ ResultSet rs = session.execute(SELECT_QUERY);
+ Assert.assertEquals(COUNT, rs.all().size());
+ session.execute(DROP_TABLE);
+ }
+
+ //
+ @Test(expected = JobExecutionException.class)
+ public void runtimeExceptionCreateQuery() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+
+ DataStreamSource<Tuple2<Long, String>> source =
env.fromCollection(collection);
+ source.addSink(new CassandraSink<Tuple2<Long,
String>>(CREATE_TABLE, INSERT_QUERY) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return CassandraConnectorTest.builder;
+ }
+ });
+
+
+ env.execute();
+
+ }
+
+ //
+ // CassandraMapperSink.java
+ //
+
+ @Test(expected = NullPointerException.class)
+ public void clazzNotSet() {
+
+ class Foo implements Serializable{
+ }
+ new CassandraMapperSink<Foo>(null) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return CassandraConnectorTest.builder;
+ }
+ };
+ }
+
+ @Test
+ public void cassandraMapperSink() throws Exception {
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStreamSource<Pojo> source = env
+ .addSource(new SourceFunction<Pojo>() {
+
+ private boolean running = true;
+ private volatile long cnt = 0;
+
+ @Override
+ public void run(SourceContext<Pojo> ctx) throws
Exception {
+ while (running) {
+ ctx.collect(new Pojo(cnt, "cassandra-" + cnt));
+ cnt++;
+ if (cnt == COUNT) {
+ cancel();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ source.addSink(new
CassandraMapperSink<Pojo>(CREATE_TABLE_MAPPER,Pojo.class) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return CassandraConnectorTest.builder;
+ }
+ });
+
+ env.execute();
+
+ ResultSet rs = session.execute(SELECT_QUERY_MAPPER);
+ Assert.assertEquals(COUNT,rs.all().size());
+ session.execute(DROP_TABLE_MAPPER);
+ }
+
+
+ //
+ // CassandraOutputFormat.java
+ // CassandraInputFormat.java
+ //
+
+ @Test
+ public void batch(){
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long,String>> dataSet =
env.fromCollection(collection);
+
+ dataSet.output(new
CassandraOutputFormat<Tuple2<Long,String>>(CREATE_TABLE,INSERT_QUERY) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return builder;
+ }
+ });
+
+ DataSet<Tuple2<Long,String>> inputDS = env.createInput(new
CassandraInputFormat<Tuple2<Long,String>>(SELECT_QUERY) {
+
+ @Override
+ public Builder configureCluster(Builder cluster) {
+ return builder;
+ }
+ });
+
+ try {
+ long count =inputDS.count();
+ Assert.assertEquals(count, 20L);
+ session.execute(DROP_TABLE);
+ } catch (Exception e) {
--- End diff --
you can just throw this exception
> Add a connector for streaming data into Cassandra
> -------------------------------------------------
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Robert Metzger
> Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance
> (for the tests):
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store
> POJOs in Cassandra (there are annotations for defining the primary key and
> types)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)