The fileStream is not designed to work with continuously updating file, as
the one of the main design goals of Spark is immutability (to guarantee
fault-tolerance by recomputation), and files that are appending (mutating)
defeats that. It rather designed to pickup new files added atomically
(using move) to a directory. So to make it work with your continuously
updated file, you will probably have to write something that periodically
rotates the continuously updated log file into separate files, and then
those files gets copied into a directory.

TD


On Wed, Jul 9, 2014 at 9:34 AM, Aravind <aravindb...@gmail.com> wrote:

> Hi Akil,
>
> It didnt work. Here is the code...
>
>
> package com.paypal;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.storage.StorageLevel;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.api.java.*;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.*;
> import org.apache.spark.streaming.api.java.*;
>
> import com.google.common.collect.Lists;
>
> import org.apache.spark.streaming.receiver.Receiver;
> import scala.Tuple2;
>
> import java.net.ConnectException;
> import java.net.Socket;
> import java.util.Arrays;
> import java.util.regex.Pattern;
> import java.io.*;
> /**
>  * Hello world!
>  *
>  */
> public class App3
> {
>     private static final Pattern SPACE = Pattern.compile(" ");
>
>     public static void main(String[] args) {
>
>         // Create the context with a 1 second batch size
>         SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
>
>         // ******* always give local[4] to execute and see the output
>         JavaStreamingContext ssc = new JavaStreamingContext("local[4]",
> "JavaNetworkWordCount",  new Duration(5000));
>
> // throws an error saying requires JavaPairDstream and not JavaDstream.
>         JavaDStream<String> lines =
> ssc.fileStream("/Users/../Desktop/alarms.log");
>         JavaDStream<String> words = lines.flatMap(
>                 new FlatMapFunction<String, String>() {
>                     public Iterable<String> call(String s) {
>                         return Arrays.asList(s.split(" "));
>                     }
>                 }
>         );
>
>         JavaPairDStream<String, Integer> ones = words.map(
>                 new Function<String, Integer>() {
>                     public Tuple2<String, Integer> call(String s) {
>                         return new Tuple2(s, 1);
>                     }
>                 }
>         );
>
>         JavaPairDStream<String, Integer> counts = ones.reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                     public Integer call(Integer i1, Integer i2) {
>                         return i1 + i2;
>                     }
>                 }
>         );
>
>
>         System.out.println("Hello world");
>         wordCounts.print();
>
>         ssc.start();
>         ssc.awaitTermination();
>     }
>
>
> }
>
> I am not able to figure out how to type cast the objects of Type
> JavaPairDStream to JDstream. Can you provide me a working code for the
> same.
> Thanks in advance.
>
> Regards
> Aravindan
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to