Michael any idea on this?
________________________________________
From: Jahagirdar, Madhu
Sent: Thursday, November 06, 2014 2:36 PM
To: mich...@databricks.com; user
Subject: CheckPoint Issue with JsonRDD

When we enable checkpoint and use JsonRDD we get the following error: Is this 
bug ?


Exception in thread "main" java.lang.NullPointerException
                at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
                at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:103)
                at 
org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:132)
                at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:194)
                at 
SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:69)
                at 
SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:63)
                at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
                at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
                at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
                at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
                at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
                at scala.util.Try$.apply(Try.scala:161)
                at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
                at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                at java.lang.Thread.run(Thread.java:745)

=============================

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.types.{StructType, StructField, StringType}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.hive.api.java.JavaHiveContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}


object SparkStreamingToParquet extends Logging {


  /**
   *
   * @param args
   * @throws Exception
   */
  def main(args: Array[String]) {
    if (args.length < 3) {
      logInfo("Please provide valid parameters: <hdfsFilesLocation: 
hdfs://ip:8020/user/hdfs/--/> <IMPALAtableloc hdfs://ip:8020/user/hive/--/> 
<tablename>")
      logInfo("make user you give full folder path with '/' at the end i.e 
/user/hdfs/abc/")
      System.exit(1)
    }
    val HDFS_FILE_LOC = args(0)
    val IMPALA_TABLE_LOC  = args(1)
    val TEMP_TABLE_NAME = args(2)
    val CHECKPOINT_DIR = args(3)

    val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR, 
()=>{
      createContext(args)
    })

    jssc.start
    jssc.awaitTermination
  }


  def createContext(args:Array[String]): StreamingContext = {

    val HDFS_FILE_LOC = args(0)
    val IMPALA_TABLE_LOC  = args(1)
    val TEMP_TABLE_NAME = args(2)
    val CHECKPOINT_DIR = args(3)

    val sparkConf: SparkConf = new SparkConf().setAppName("Json to 
Parquet").set("spark.cores.max", "3")

    val jssc: StreamingContext = new StreamingContext(sparkConf, new 
Duration(30000))

    val hivecontext: HiveContext = new HiveContext(jssc.sparkContext)
    
hivecontext.createParquetFile[Person](IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);

    val schemaString = "name age"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, true)))

    val textFileStream = jssc.textFileStream(HDFS_FILE_LOC)

    textFileStream.foreachRDD(rdd => {
      if(rdd !=null && rdd.count()>0) {
          val schRdd =  hivecontext.jsonRDD(rdd,schema)
          logInfo("inserting into table: " + TEMP_TABLE_NAME)
          schRdd.insertInto(TEMP_TABLE_NAME)
      }
    })
    jssc.checkpoint(CHECKPOINT_DIR)
    jssc
  }
}

====

case class Person(name:String, age:String) extends Serializable

Regards,
Madhu jahagirdar

________________________________
The information contained in this message may be confidential and legally 
protected under applicable law. The message is intended solely for the 
addressee(s). If you are not the intended recipient, you are hereby notified 
that any use, forwarding, dissemination, or reproduction of this message is 
strictly prohibited and may be unlawful. If you are not the intended recipient, 
please contact the sender by return e-mail and destroy all copies of the 
original message.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to