[
https://issues.apache.org/jira/browse/SPARK-20488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
SiddharthRam updated SPARK-20488:
---------------------------------
Description:
Please suggest the right way
below is one attempt from my end :
i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code i`m
using and always getting a null pointer exception when trying to create a
dataframe or dataset
val sparkConf = new SparkConf().setAppName(Kafka_topic)
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder
.config(sparkConf)
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
stream.foreachRDD( rdd => {
if(!rdd.isEmpty()) {
val rdm = rdd.map(rdd =>
(rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
rdm.foreach(rda => {
// bunch of code works fine
//load json from file or String
var sqlContext = spark.read.json(json) [ load example ]
}
})
}
ssc.start()
was:
Please suggest the right way
below is one attempt from my end :
i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code i`m
using and always getting a null pointer exception when trying to create a
dataframe or dataset
val sparkConf = new SparkConf().setAppName(Kafka_topic)
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder
.config(sparkConf)
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
stream.foreachRDD( rdd => {
if(!rdd.isEmpty()) {
val rdm = rdd.map(rdd =>
(rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
rdm.foreach(rda => {
// bunch of code works fine
//load json from file or String
var sqlContext = spark.read.json((compact(render(json)))) [ load example ]
}
})
}
ssc.start()
> how to create a dataframe or dataset in a streaming context spark 2.1.0 on
> mesos 1.0,1
> --------------------------------------------------------------------------------------
>
> Key: SPARK-20488
> URL: https://issues.apache.org/jira/browse/SPARK-20488
> Project: Spark
> Issue Type: Question
> Components: Spark Submit
> Affects Versions: 2.1.0
> Reporter: SiddharthRam
>
> Please suggest the right way
> below is one attempt from my end :
> i`m using spark 2.1.0 (spark-submit job) and below is a snippet of the code
> i`m using and always getting a null pointer exception when trying to create a
> dataframe or dataset
> val sparkConf = new SparkConf().setAppName(Kafka_topic)
> val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
> val spark = SparkSession
> .builder
> .config(sparkConf)
> .config("spark.sql.warehouse.dir", warehouseLocation)
> .enableHiveSupport()
> .getOrCreate()
> val ssc = new StreamingContext(spark.sparkContext,Seconds(1))
> val stream = KafkaUtils.createDirectStream[String, String](
> ssc,
> PreferConsistent,
> Subscribe[String, String](topics, kafkaParams)
> stream.foreachRDD( rdd => {
> if(!rdd.isEmpty()) {
> val rdm = rdd.map(rdd =>
> (rdd.key(),rdd.value(),rdd.partition(),rdd.offset()))
> rdm.foreach(rda => {
> // bunch of code works fine
> //load json from file or String
> var sqlContext = spark.read.json(json) [ load example ]
> }
> })
> }
> ssc.start()
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]