Hi Kali,

If you do not mind sending JSON, you could do something like this, using
json4s:


val rows = p.collect() map ( row => TestTable(row.getString(0),
row.getString(1)) )

val json = parse(write(rows))

producer.send(new KeyedMessage[String, String]("trade", writePretty(json)))

// or for each individual entry
for( row <- rows) {
  producer.send(new KeyedMessage[String, String]("trade",
writePretty(parse(write(row)))))
}

Just make sure you import the following:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{ read, write, writePretty }


On Wed, Sep 23, 2015 at 12:26 PM, kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Guys sorry I figured it out.
>
> val
>
> x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")
>
> Full Code:-
>
> package com.examples
>
> /**
>  * Created by kalit_000 on 22/09/2015.
>  */
>
> import kafka.producer.KeyedMessage
> import kafka.producer.Producer
> import kafka.producer.ProducerConfig
> import java.util.Properties
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark._
> import org.apache.spark.SparkContext._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.SparkConf
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.{Seconds,StreamingContext}
> import org.apache.spark._
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> object SparkProducerDBCassandra {
>
>   case class TestTable (TRADE_ID:String,TRADE_PRICE: String)
>
>   def main(args: Array[String]): Unit =
>   {
>     Logger.getLogger("org").setLevel(Level.WARN)
>     Logger.getLogger("akka").setLevel(Level.WARN)
>
>     val conf = new
>
> SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
> "127.0.0.1")
>     val sc=new SparkContext("local","test",conf)
>     //val ssc= new StreamingContext(sc,Seconds(2))
>
>     print("Test kali Spark Cassandra")
>
>     val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)
>
>     val p=cc.sql("select * from people.person")
>
>     p.collect().foreach(println)
>
>     val props:Properties = new Properties()
>     props.put("metadata.broker.list", "localhost:9092")
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config= new ProducerConfig(props)
>     val producer= new Producer[String,String](config)
>
>     val
>
> x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")
>
>        producer.send(new KeyedMessage[String, String]("trade", x))
>
>     //p.collect().foreach(print)
>
>     //ssc.start()
>
>     //ssc.awaitTermination()
>
>   }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to