I think you need to start your streaming job, then put the files there to get them read. textFileStream doesn't read the existing files i believe.
Also are you sure the path is not the following? (no missing / in the beginning?) JavaDStream<String> textStream = ssc.textFileStream("/user/ huser/user/huser/flume"); Thanks Best Regards On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson < jeniba.john...@lntinfotech.com> wrote: > > Hi Hari, > > Iam trying to read data from a file which is stored in HDFS. Using Flume > the data is tailed and stored in HDFS. > Now I want to read this data using TextFileStream. Using the below > mentioned code Iam not able to fetch the > Data from a file which is stored in HDFS. Can anyone help me with this > issue. > > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.FlatMapFunction; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.Duration; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > import com.google.common.collect.Lists; > > import java.util.Arrays; > import java.util.List; > import java.util.regex.Pattern; > > public final class Test1 { > public static void main(String[] args) throws Exception { > > SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); > JavaStreamingContext ssc = new > JavaStreamingContext("local[4]","JavaWordCount", new Duration(20000)); > > JavaDStream<String> textStream = > ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in > HDFS > > > JavaDStream<String> suspectedStream = textStream.flatMap(new > FlatMapFunction<String,String>() > { > public Iterable<String> call(String line) > throws Exception { > > //return > Arrays.asList(line.toString().toString()); > return > Lists.newArrayList(line.toString().toString()); > } > }); > > > suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){ > > public Void call(JavaRDD<String> rdd) throws Exception { > List<String> output = rdd.collect(); > System.out.println("Sentences Collected from Flume " + output); > return null; > } > }); > > suspectedStream.print(); > > System.out.println("Welcome TO Flume Streaming"); > ssc.start(); > ssc.awaitTermination(); > } > > } > > The command I use is: > ./bin/spark-submit --verbose --jars > lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*] > --deploy-mode client --class xyz.Test1 bin/filestream3.jar > > > > > > Regards, > Jeniba Johnson > > > ________________________________ > The contents of this e-mail and any attachment(s) may contain confidential > or privileged information for the intended recipient(s). Unintended > recipients are prohibited from taking action on the basis of information in > this e-mail and using or disseminating the information, and must notify the > sender and delete it from their system. L&T Infotech will not accept > responsibility or liability for the accuracy or completeness of, or the > presence of any virus or disabling code in this e-mail" >