the data can be written as parquet into HDFS. But the loading data process is 
not working as expected.

--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:<luohui20...@sina.com>
收件人:"user" <user@spark.apache.org>
主题:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL 
table
日期:2016年09月05日 18点55分

hi guys:     I got a question that  my SparkStreaming APP can not loading data 
into SparkSQL table in. Here is my code:
    val conf = new SparkConf().setAppName("KafkaStreaming for " + 
topics).setMaster("spark://master60:7077")
    val storageLevel = StorageLevel.DISK_ONLY
    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //Receiver-based 
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
storageLevel)

    kafkaStream.foreachRDD { rdd =>
      val x = rdd.count()
      println(s"================processing $x records=================")
      rdd.collect().foreach(println)
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      val logRDD = 
sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
      val logRDD2 = logRDD.map(_.split(',')).map { x =>
        NginxLog(x(0).trim().toFloat.toInt,
          x(1).trim(),
          x(2).trim(),
          x(3).trim(),
          x(4).trim(),
          x(5).trim(),
          x(6).trim(),
          x(7).trim(),
          x(8).trim(),
          x(9).trim(),
          x(10).trim())
      }
      val recDF = logRDD2.toDF
      recDF.printSchema()

      val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
      val index = rdd.id
      recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
      hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr 
String,u_domain String,u_url String,u_title String,u_referrer String,u_sh 
String,u_sw String,u_cd String,u_lang String,u_utrace String) STORED AS 
PARQUET")      hc.sql(s"LOAD DATA INPATH 
'/etl/tables/nginxlog/${topicNO}/${index}' INTO TABLE nginxlog")    }

There isn't any exception during running my APP. however, except the data in 
the first batch could be loaded into table nginxlog, all other batches can not 
be successfully loaded.I can not understand the reason of this kind of 
behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1




--------------------------------

 

Thanks&amp;Best regards!
San.Luo

Reply via email to