Hi,

The aim here is as follows:

   - read data from Socket using Spark Streaming every N seconds

   - register received data as SQL table

   - there will be more data read from HDFS etc as reference data, they will
also be registered as SQL tables

   - the idea is to perform arbitrary SQL queries on the combined streaming
& reference data

Please see below code snippet. I see Data is written out to disk from
"inside" the forEachRDD loop but the same registered SQL table's data is
empty when written "outside" of forEachRDD loop.

Please give your opinion/suggestions to fix this. Also any other mechanism
to achieve the above stated "aim" is welcome.

case class Record(id:Int, status:String, source:String)

object SqlApp2 {
  def main(args: Array[String]) {
    val sparkConf = new
SparkConf().setAppName("SqlApp2").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    // Create the streaming context with a 10 second batch size
    val ssc = new StreamingContext(sc, Seconds(10))

    val lines = ssc.socketTextStream("localhost", 9999,
StorageLevel.MEMORY_AND_DISK_SER)

    var alldata:DataFrame=sqlContext.emptyDataFrame
    alldata.registerTempTable("alldata")

    lines.foreachRDD((rdd: RDD[String], time: Time) => {
      import sqlContext.implicits._

      // Convert RDD[String] to DataFrame
      val data = rdd.map(w => {
        val words = w.split(" ")
        Record(words(0).toInt, words(1), words(2))}).toDF()

      // Register as table
      data.registerTempTable("alldata")
      data.save("inside/file"+System.currentTimeMillis(), "json",
SaveMode.ErrorIfExists)  // this data is written properly
    })

    val dataOutside = sqlContext.sql("select * from alldata")
    dataOutside.save("outside/file"+System.currentTimeMillis(), "json",
SaveMode.ErrorIfExists) // this data is empty, how to make the SQL table
registered inside the forEachRDD loop visible for rest of application

    ssc.start()
    ssc.awaitTermination()
  }

Thanks & Regards

MK



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Make-Spark-Streaming-DStream-as-SQL-table-tp25699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to