Hi All, 

I am new bee in spark.

I managed to write up kafka prodcuder in spark where data comes from
Cassandra table but I have few questions.

Spark data output from Cassandra looks like below.

[2,Joe,Smith]
[1,Barack,Obama]

I would like something like this in my kafka consumer, so need to remove []
at the beginning and end

2~Joe~Smith
1~Barack~Obama

Also when I collect on rdd and add mkstring(",") two lines of data are
getting combined as one 
(Ex:- [2,Joe,Smith][1,Barack,Obama]), so I used mkstring("\n") so now data
looks like
[2,Joe,Smith]
[1,Barack,Obama]

but I need something like this in my kafka consumer when I receive data any
idea.

2~Joe~Smith
1~Barack~Obama


Spark Code:-

package com.examples

/**
 * Created by kalit_000 on 22/09/2015.
 */

import kafka.producer.KeyedMessage
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import java.util.Properties
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkProducerDBCassandra {

  case class TestTable (TRADE_ID:String,TRADE_PRICE: String)

  def main(args: Array[String]): Unit =
  {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new
SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
"127.0.0.1")
    val sc=new SparkContext("local","test",conf)

    print("Test kali Spark Cassandra")

    val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)

    val p=cc.sql("select * from people.person")

    p.collect().foreach(println)

    val props:Properties = new Properties()
    props.put("metadata.broker.list", "localhost:9092")
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config= new ProducerConfig(props)
    val producer= new Producer[String,String](config)

    val x=p.collect().mkString("\n")

    producer.send(new KeyedMessage[String, String]("trade", x))

  }
}

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to