I think you need to start your streaming job, then put the files there to
get them read. textFileStream doesn't read the existing files i believe.

Also are you sure the path is not the following? (no missing / in the
beginning?)

JavaDStream<String> textStream = ssc.textFileStream("/user/
huser/user/huser/flume");


Thanks
Best Regards

On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson <
jeniba.john...@lntinfotech.com> wrote:

>
> Hi Hari,
>
> Iam trying to read data from a file which is stored in HDFS. Using Flume
> the data is tailed and stored in HDFS.
> Now I want to read this data using TextFileStream. Using the below
> mentioned code Iam not able to fetch the
> Data  from a file which is stored in HDFS. Can anyone help me with this
> issue.
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.FlatMapFunction;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import com.google.common.collect.Lists;
>
> import java.util.Arrays;
> import java.util.List;
> import java.util.regex.Pattern;
>
> public final class Test1 {
>   public static void main(String[] args) throws Exception {
>
>     SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
>     JavaStreamingContext ssc = new
> JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));
>
>     JavaDStream<String> textStream =
> ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in
> HDFS
>
>
>     JavaDStream<String> suspectedStream = textStream.flatMap(new
> FlatMapFunction<String,String>()
>      {
>                             public Iterable<String> call(String line)
> throws Exception {
>
>                             //return
> Arrays.asList(line.toString().toString());
>                            return
> Lists.newArrayList(line.toString().toString());
>                              }
>      });
>
>
>     suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){
>
>         public Void call(JavaRDD<String> rdd) throws Exception {
>         List<String> output = rdd.collect();
>         System.out.println("Sentences Collected from Flume " + output);
>                return  null;
>         }
>         });
>
>     suspectedStream.print();
>
>     System.out.println("Welcome TO Flume Streaming");
>     ssc.start();
>     ssc.awaitTermination();
>   }
>
> }
>
> The command I use is:
> ./bin/spark-submit --verbose --jars
> lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*]
> --deploy-mode client --class xyz.Test1 bin/filestream3.jar
>
>
>
>
>
> Regards,
> Jeniba Johnson
>
>
> ________________________________
> The contents of this e-mail and any attachment(s) may contain confidential
> or privileged information for the intended recipient(s). Unintended
> recipients are prohibited from taking action on the basis of information in
> this e-mail and using or disseminating the information, and must notify the
> sender and delete it from their system. L&T Infotech will not accept
> responsibility or liability for the accuracy or completeness of, or the
> presence of any virus or disabling code in this e-mail"
>

Reply via email to