[ 
https://issues.apache.org/jira/browse/SPARK-8440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bartek updated SPARK-8440:
--------------------------
    Description: 
hi, 

I'm trying to run simple kafka spark streaming example over spark-shell:

{code}
sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", 
"group.id" -> "test")
val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map("test" -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)

messages.foreachRDD { pairRDD =>
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
}
ssc.start()
ssc.awaitTermination()

{code}

in spark output i'm able to find only following println log:
{code}
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
{code}

but unfortunately can't find output of:
{code}
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") and 
println(s"DataListener.listen() [row = ${row}]")
{code}

it's my spark-shell full output - http://pastebin.com/sfxbYYga

any ideas what i'm doing wrong? thanks!

  was:
hi, 

I'm trying to run simple kafka spark streaming example over spark-shell:

{code}
sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", 
"group.id" -> "test")
val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map("test" -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)

messages.foreachRDD { pairRDD =>
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
}
ssc.start()
ssc.awaitTermination()

{code}

in spark output i'm able to find only log:
{code}
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
{code}

but unfortunately can't find output of:
{code}
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") and 
println(s"DataListener.listen() [row = ${row}]")
{code}

it's my spark-shell full output - http://pastebin.com/sfxbYYga

any ideas what i'm doing wrong? thanks!


> kafka spark streaming working example
> -------------------------------------
>
>                 Key: SPARK-8440
>                 URL: https://issues.apache.org/jira/browse/SPARK-8440
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.2.2, 1.4.0
>            Reporter: Bartek
>
> hi, 
> I'm trying to run simple kafka spark streaming example over spark-shell:
> {code}
> sc.stop
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext._
> import kafka.serializer.DefaultDecoder
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.storage.StorageLevel
> val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("local")
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> val kafkaParams = Map[String, String]("zookeeper.connect" -> 
> "127.0.0.1:2181", "group.id" -> "test")
> val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map("test" -> 1), 
> StorageLevel.MEMORY_ONLY_SER).map(_._2)
> messages.foreachRDD { pairRDD =>
> println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
> println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
> pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
> }
> ssc.start()
> ssc.awaitTermination()
> {code}
> in spark output i'm able to find only following println log:
> {code}
> println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
> {code}
> but unfortunately can't find output of:
> {code}
> println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") and 
> println(s"DataListener.listen() [row = ${row}]")
> {code}
> it's my spark-shell full output - http://pastebin.com/sfxbYYga
> any ideas what i'm doing wrong? thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to