[
https://issues.apache.org/jira/browse/SPARK-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
QingFeng Zhang updated SPARK-1797:
----------------------------------
Description:
when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
files , but the sum of rdd.count() is less than 200, always between 130 and
170, I don't know why...Is this a Bug?
PS: When I put 200 files in hdfs before streaming run , It get the correct
count and right result.
def main(args: Array[String]) {
val conf = new SparkConf().setMaster(SparkURL)
.setAppName("QimageStreaming-broadcast")
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass()))
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
conf.set("spark.kryoserializer.buffer.mb", "10");
val ssc = new StreamingContext(conf, Seconds(2))
val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
val input_path = HdfsURL + "/Qimage/input"
val output_path = HdfsURL + "/Qimage/output/"
val bg_path = HdfsURL + "/Qimage/bg/"
val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
QimageInputFormat[Text, Qimage]](bg_path)
val bbg = bg.map(data => (data._1.toString(), data._2))
val broadcastbg = ssc.sparkContext.broadcast(bbg)
val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
Qimage]](input_path)
val qingbg = broadcastbg.value.collectAsMap
val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
val rddnum = rdd.count
System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
if (rddnum > 0) {
System.out.println("here is foreachFunc")
val a = rdd.keys
val b = a.first
val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
rdd.map(data => (data._1, (new QimageProc(data._1,
data._2)).koutu(cbg)))
.saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
outputFormatClass)
}
}
file.foreachRDD(foreachFunc)
ssc.start()
ssc.awaitTermination()
}
was:
when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
files , but the sum of rdd.count() is less than 200, always between 130 and
170, I don't know why...Is this a Bug?
PS: When I put 200 files in hdfs before streaming run , It get the correct
count and right result.
def main(args: Array[String]) {
val conf = new SparkConf().setMaster(SparkURL)
.setAppName("QimageStreaming-broadcast")
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass()))
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
conf.set("spark.kryoserializer.buffer.mb", "10");
val ssc = new StreamingContext(conf, Seconds(2))
val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
val input_path = HdfsURL + "/Qimage/input"
val output_path = HdfsURL + "/Qimage/output/"
val bg_path = HdfsURL + "/Qimage/bg/"
val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
QimageInputFormat[Text, Qimage]](bg_path)
val bbg = bg.map(data => (data._1.toString(), data._2))
val broadcastbg = ssc.sparkContext.broadcast(bbg)
val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
Qimage]](input_path)
val qingbg = broadcastbg.value.collectAsMap
val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
val rddnum = rdd.count
System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
if (rddnum > 0) {
System.out.println("here is foreachFunc")
val a = rdd.keys
val b = a.first
val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
rdd.map(data => (data._1, (new QimageProc(data._1,
data._2)).koutu(cbg)))
.saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
outputFormatClass)
}
}
file.foreachRDD(foreachFunc)
ssc.start()
ssc.awaitTermination()
}
> streaming on hdfs can detected all new file, but the sum of all the
> rdd.count() not equals which had detected
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-1797
> URL: https://issues.apache.org/jira/browse/SPARK-1797
> Project: Spark
> Issue Type: Bug
> Affects Versions: 0.9.0
> Environment: spark0.9.0,hadoop2.3.0,1 Master,5 Slaves.
> Reporter: QingFeng Zhang
>
> when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
> files , but the sum of rdd.count() is less than 200, always between 130 and
> 170, I don't know why...Is this a Bug?
> PS: When I put 200 files in hdfs before streaming run , It get the correct
> count and right result.
> def main(args: Array[String]) {
> val conf = new SparkConf().setMaster(SparkURL)
> .setAppName("QimageStreaming-broadcast")
> .setSparkHome(System.getenv("SPARK_HOME"))
> .setJars(SparkContext.jarOfClass(this.getClass()))
> conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
> conf.set("spark.kryoserializer.buffer.mb", "10");
> val ssc = new StreamingContext(conf, Seconds(2))
> val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
> val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
> val input_path = HdfsURL + "/Qimage/input"
> val output_path = HdfsURL + "/Qimage/output/"
> val bg_path = HdfsURL + "/Qimage/bg/"
> val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
> QimageInputFormat[Text, Qimage]](bg_path)
> val bbg = bg.map(data => (data._1.toString(), data._2))
> val broadcastbg = ssc.sparkContext.broadcast(bbg)
> val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
> Qimage]](input_path)
> val qingbg = broadcastbg.value.collectAsMap
> val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
> val rddnum = rdd.count
> System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
> if (rddnum > 0) {
> System.out.println("here is foreachFunc")
> val a = rdd.keys
> val b = a.first
> val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
> rdd.map(data => (data._1, (new QimageProc(data._1,
> data._2)).koutu(cbg)))
> .saveAsNewAPIHadoopFile(output_path, classOf[Text],
> classOf[Qimage], outputFormatClass)
> }
> }
> file.foreachRDD(foreachFunc)
> ssc.start()
> ssc.awaitTermination()
> }
--
This message was sent by Atlassian JIRA
(v6.2#6252)