BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you
dont need to create the singleton yourself.

On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio <
drarse.a...@gmail.com> wrote:

> Note: CCing user@spark.apache.org
>
>
> First, you must check if the RDD is empty:
>
>          messages.foreachRDD { rdd =>
>                                  if (!rdd.isEmpty) { ....}}
>
> Now, you can obtain the instance of a SQLContext:
>
> val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
>
>
>
>
> *Optional*
> In this moment, I like work with DataFrame. I convert RDD to DataFrame. I
> see that you recive a JSON:
>
> val df :DataFrame = sqlContext.jsonRDD(message,
> getSchema(getSchemaStr)).toDF()
>
>
> My getSchema function create a Schema of my JSON:
>
> def getSchemaStr() :String = "feature1 feature2 ..."
>
> def getSchema(schema: String) :StructType = StructType (schema.split("
> ").map(fieldName => StructField(fieldName, StringType, true)))
>
> I hope you helps.
>
> Regards.
>
>
>
> 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] <
> ml-node+s1001560n23226...@n3.nabble.com>:
>
>> I don't know why, you said “Why? I tried this solution and works fine.”
>> means your SQLContext instance alive all the streaming application’s life
>> time, rather than one bath duration ? My code as below:
>>
>>
>> object SQLContextSingleton extends java.io.Serializable{
>>   @transient private var instance: SQLContext = null
>>
>>   // Instantiate SQLContext on demand
>>   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
>>     if (instance == null) {
>>       instance = new SQLContext(sparkContext)
>>     }
>>     instance
>>   }
>> }
>>
>> // type_->typex, id_->id, url_->url
>> case class dddd (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends 
>> Serializable
>> case class Count(x: Int)
>>
>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
>> ssc.checkpoint(".")
>>
>> val kafkaParams = Map("metadata.broker.list" -> "10.20.30.40:9092,")
>> @transient val dstream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
>> @transient val dddstream= newsIdDStream.map(x => x._2).flatMap(x => 
>> x.split("\n"))
>>
>> dddstream.foreachRDD { rdd =>
>>     
>> SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable("ttable")
>>     val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql("SELECT 
>> COUNT(*) FROM ttable")
>>     ret.foreach{ x => println(x(0)) }
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>>
>>
>>
>>
>> 在 2015-06-09 17:41:44,"drarse [via Apache Spark User List]" <[hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=23226&i=0>> 写道:
>>
>> Why? I  tried  this solution and works fine.
>>
>> El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
>> <[hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=23219&i=0>> escribió:
>>
>>> Hi drarse, thanks for replying, the way you said use a singleton object
>>> does not work
>>>
>>>
>>>
>>>
>>> 在 2015-06-09 16:24:25,"drarse [via Apache Spark User List]" <[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=23218&i=0>> 写道:
>>>
>>> The best way is create a singleton object like:
>>>
>>> object SQLContextSingleton {
>>>>   @transient private var instance: SQLContext = null
>>>>
>>>>   // Instantiate SQLContext on demand
>>>>   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
>>>>     if (instance == null) {
>>>>       instance = new SQLContext(sparkContext)
>>>>     }
>>>>     instance
>>>>   }}
>>>>
>>>>  You have more information in the programming guide:
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
>>>
>>>
>>>
>>> 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] <[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=0>>:
>>>
>>>> I used SQLContext in a spark streaming application as blew:
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------
>>>>
>>>> case class topic_name (f1: Int, f2: Int)
>>>>
>>>> val sqlContext = new SQLContext(sc)
>>>> @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
>>>> ssc.checkpoint(".")
>>>> val theDStream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
>>>>
>>>> theDStream.map(x => x._2).foreach { rdd =>
>>>>   sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
>>>>   sqlContext.sql("select count(*) from topic_name").foreach { x =>
>>>>     WriteToFile("file_path", x(0).toString)
>>>>   }
>>>> }
>>>>
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>> ----------------------------------------------------------------------------------------------------------------
>>>>
>>>>
>>>> I found i could only get every 5 seconds's count of message, because
>>>> "The lifetime of this temporary table is tied to the SQLContext that was
>>>> used to create this DataFrame", i guess every 5 seconds, a new sqlContext
>>>> will be create and the temporary table can only alive just 5 seconds, i
>>>> want to the sqlContext and the temporary table alive all the streaming
>>>> application's life cycle, how to do it?
>>>>
>>>> Thanks~
>>>>
>>>> ------------------------------
>>>>  If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html
>>>>  To start a new topic under Apache Spark User List, email [hidden
>>>> email] <http:///user/SendEmail.jtp?type=node&node=23216&i=1>
>>>> To unsubscribe from Apache Spark User List, click here.
>>>> NAML
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html
>>>  To unsubscribe from How to keep a SQLContext instance alive in a spark
>>> streaming application's life cycle?, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html
>>>  To start a new topic under Apache Spark User List, email <a
>>> href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ml-node%[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=23226&i=1>&#39;);"
>>> target="_blank">ml-node+s1001560n1h49@...
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> --
>> Atte. Sergio Jiménez
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23219.html
>>  To unsubscribe from How to keep a SQLContext instance alive in a spark
>> streaming application's life cycle?, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>>
>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23226.html
>>  To start a new topic under Apache Spark User List, email
>> ml-node+s1001560n1...@n3.nabble.com
>> To unsubscribe from Apache Spark User List, click here
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==>
>> .
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>

Reply via email to