fileStream has a parameter "newFilesOnly". By default, it's true and means
processing only new files and ignore existing files in the directory. So
you need to ***move*** the files into the directory, otherwise it will
ignore existing files.

You can also set "newFilesOnly" to false. Then in the first batch, it will
process all existing files.

On Thu, Jan 28, 2016 at 4:22 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> HI All,
>
> I am trying to run HdfsWordCount example from github.
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
>
> i am using ubuntu to run the program, but dont see any data getting
> printed after ,
> -------------------------------------------
> Time: 1454026800000 ms
> -------------------------------------------
>
> I dont see any errors, the program just runs, but i do not see any output
> of the data corresponding to the file used.
>
> object HdfsStream {
>
>   def main(args:Array[String]): Unit = {
>
>     val sparkConf = new 
> SparkConf().setAppName("SpoolDirSpark").setMaster("local[5]")
>     val ssc = new StreamingContext(sparkConf, Minutes(10))
>
>     //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark"
>     //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark/test.txt"
>     val inputDirectory = "file:///home/satyajit/jsondata/"
>
>     val lines = 
> ssc.fileStream[LongWritable,Text,TextInputFormat](inputDirectory).map{case(x,y)=>
>  (x.toString,y.toString)}
>     //lines.saveAsTextFiles("hdfs://localhost:9000/SpoolDirSpark/datacheck")
>     lines.saveAsTextFiles("file:///home/satyajit/jsondata/")
>
>     println("check_data"+lines.print())
>
>     ssc.start()
>     ssc.awaitTermination()
>
> Would like to know if there is any workaround, or if there is something i
> am missing.
>
> Thanking in advance,
> Satyajit.
>

Reply via email to