Hi,

While writing data to cassandra from spark, data is not getting
written. The flash back is:
I am doing a kafka-sparkStreaming-cassandra integration.
I am reading kafka messages and trying to put it in a cassandra table
CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT).
kafka to spark-streaming is running cool, but spark to cassandra,
there is some issue...data not getting written to table.
I am able to create a connection with cassandra, but the data is not
getting inserted into the cassandra table.
and the logs show its getting connected and the next second getting
disconnected.
the full code and the logs and dependencies are below:

public class SparkStream {
    static int key=0;
    public static void main(String args[]) throws Exception
    {

        if(args.length != 3)
        {
            System.out.println("parameters not given properly");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        /* Connection to Spark */
        SparkConf conf = new SparkConf();
        conf.set("spark.cassandra.connection.host", "localhost");
        JavaSparkContext sc = new JavaSparkContext("local[4]",
"SparkStream",conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(5000));


        /* connection to cassandra */
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        System.out.println("+++++++++++cassandra connector
created++++++++++++++++++++++++++++");


        /* Receive Kafka streaming inputs */
        JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("+++++++++++++streaming Connection
done!+++++++++++++++++++++++++++");


        /* Create DStream */
        JavaDStream<TestTable> data = messages.map(new Function<
Tuple2<String,String>, TestTable >()
        {
            public TestTable call(Tuple2<String, String> message)
            {
                return new TestTable(new Integer(++key), message._2() );
            }
        }
        );
        System.out.println("++++++++++++++++JavaDStream<TestTable>
created++++++++++++++++++++++++++++");


        /* Write to cassandra */
        javaFunctions(data).writerBuilder("testkeyspace",
"test_table", mapToRow(TestTable.class)).saveToCassandra();


        jssc.start();
        jssc.awaitTermination();

    }
}

class TestTable implements Serializable
{
    Integer key;
    String value;

    public TestTable() {}

    public TestTable(Integer k, String v)
    {
        key=k;
        value=v;
    }

    public Integer getKey(){
        return key;
    }

    public void setKey(Integer k){
        key=k;
    }

    public String getValue(){
        return value;
    }

    public void setValue(String v){
        value=v;
    }

    public String toString(){
        return MessageFormat.format("TestTable'{'key={0},
value={1}'}'", key, value);

    }
}

The log is:
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host
localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra
cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding
host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding
host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from
Cassandra cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host
localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra
cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding
host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding
host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from
Cassandra cluster: Test Cluster

The POM.xml dependencies are:
   <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.1</version>
</dependency>


    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>2.1.3</version>
</dependency>

Reply via email to