[ 
https://issues.apache.org/jira/browse/SPARK-20488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SiddharthRam updated SPARK-20488:
---------------------------------
    Description: 
Please suggest the right way 

below is one attempt from my end :
i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code i`m 
using and always getting a null pointer exception when trying to create a 
dataframe or dataset 

      val sparkConf = new SparkConf().setAppName(Kafka_topic)
      val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
      val spark = SparkSession
                  .builder
                  .config(sparkConf)
                  .config("spark.sql.warehouse.dir", warehouseLocation)
                  .enableHiveSupport()
                  .getOrCreate()
      val ssc = new StreamingContext(spark.sparkContext,Seconds(1))

      val stream = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)

stream.foreachRDD( rdd => {
        if(!rdd.isEmpty()) {
          val rdm = rdd.map(rdd => 
(rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
          rdm.foreach(rda =>  {
// bunch of code works fine 
//load json from file or String 
 var sqlContext =  spark.read.json(json) [ load example ]
}
})
}
ssc.start()


  was:
Please suggest the right way 

below is one attempt from my end :
i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code i`m 
using and always getting a null pointer exception when trying to create a 
dataframe or dataset 

      val sparkConf = new SparkConf().setAppName(Kafka_topic)
      val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
      val spark = SparkSession
                  .builder
                  .config(sparkConf)
                  .config("spark.sql.warehouse.dir", warehouseLocation)
                  .enableHiveSupport()
                  .getOrCreate()
      val ssc = new StreamingContext(spark.sparkContext,Seconds(1))

      val stream = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)

stream.foreachRDD( rdd => {
        if(!rdd.isEmpty()) {
          val rdm = rdd.map(rdd => 
(rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
          rdm.foreach(rda =>  {
// bunch of code works fine 
//load json from file or String 
 var sqlContext =  spark.read.json((compact(render(json)))) [ load example ]
}
})
}
ssc.start()



> how to create a dataframe or dataset in a streaming context spark 2.1.0 on 
> mesos 1.0,1
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-20488
>                 URL: https://issues.apache.org/jira/browse/SPARK-20488
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Submit
>    Affects Versions: 2.1.0
>            Reporter: SiddharthRam
>
> Please suggest the right way 
> below is one attempt from my end :
> i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code 
> i`m using and always getting a null pointer exception when trying to create a 
> dataframe or dataset 
>       val sparkConf = new SparkConf().setAppName(Kafka_topic)
>       val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
>       val spark = SparkSession
>                   .builder
>                   .config(sparkConf)
>                   .config("spark.sql.warehouse.dir", warehouseLocation)
>                   .enableHiveSupport()
>                   .getOrCreate()
>       val ssc = new StreamingContext(spark.sparkContext,Seconds(1))
>       val stream = KafkaUtils.createDirectStream[String, String](
>         ssc,
>         PreferConsistent,
>         Subscribe[String, String](topics, kafkaParams)
> stream.foreachRDD( rdd => {
>         if(!rdd.isEmpty()) {
>           val rdm = rdd.map(rdd => 
> (rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
>           rdm.foreach(rda =>  {
> // bunch of code works fine 
> //load json from file or String 
>  var sqlContext =  spark.read.json(json) [ load example ]
> }
> })
> }
> ssc.start()



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to